본문 바로가기

안드로이드/RxAndroid(RxJava)

[RxJava] Observable

Observable

RxJava에서는 Observable을 구독(subscribe)하는 Observer가 존재하고, Observable이 순차적으로 발행하는 데이터에 대해서 반응한다. Observable은 다음 3가지 이벤트를 통해 동작한다. 

  • onNext(): 하나의 소스 Observable에서 Observer까지 한 번에 하나씩 순차적으로 데이터를 발행
  • onComplete(): 데이터 발행이 끝났음을 Observer에게 알려 onNext()이 더 이상 발생하지 않음을 알림
  • onError(): 오류가 발생했음을 Observer에 알림.

위에서 데이터 발행 과정에서 null은 발행 불가능하다. 그리고, 위 세가지 이벤트는 Emitter 인터페이스에 선언된다.

핵심: 이벤트를 만들어내는(emit) 주최. 즉, 이벤트 스트림인 Observable을 구독(subscribe)하여 원하는 동작을 수행하자.

 

연산자(Operator)를 통해 Observable 생성하기

RxJava에서는 연산자(Operator)를 통해 기존 데이터를 참조, 변형하여 Observable을 생성할 수 있다. 쉽게 설명하면 연산자는 이벤트 스트림을 통해 전달되는 이벤트를 적절히 가공하거나 만족하는 이벤트만 필터링 하는 등의 작업을 수행하기 위해 사용된다. 포스팅에선 자주 쓰이는 연산자만 소개한다.

 

craete() 연산자

Observable.create()를 활용하면 Emitter를 이용하여 아이템을 발행하고, 발행 완료 및 오류의 알림을 설정할 수 있다.

Observable<String> source = Observable.create(emitter -> {
    emitter.onNext("Hi");
    emitter.onNext("Gong");
    emitter.onComplete();
    emitter.onNext("bye");
}

source.subscribe(System.out::println());

//실행결과
Hi
Gong

emitter를 통해 문자열 "Hi", "Gong"을 발행하고 중간에 onComplete()로 발행이 끝났음을 알린 뒤, "bye"를 발행했다. 구독자는 onComplete() 이후의 아이템 발행에 대한 통지는 받을 수 없다. 

 

위처럼 create()은 개발자가 직접 emitter를 제어하므로 주의해야 한다. 예를 들어  Observable이 폐기(Disposable) 되었을 때 콜백을 모두 해제하지 않으면 메모리 누수가 발생하고, BackPressure를 직접 처리해야 한다.

 

just() 연산자

just()는 해당 아이템을 그대로 발행하는 Observable을 생성하는 연산자이다.

Observable<String> source = Observable.just("Hi","Gong");

source.subscribe(System.out::println());

//실행결과
Hi
Gong

 

자료구조를 Observable로 변환하기

이미 참조할 수 있는 배열, 리스트와 같은 자료구조나 Future, Callable, Publisher 등은 from으로 시작하는 연산자를 통해 Observable로 변환할 수 있다.

아래는 from과 관련된 메서드이다.

연산자(Operator) 설명
fromArray() 배열을 ObservableSource로 변환하여 아이템을 순서대로 발행
fromIterator() ArrayList, HashSet과 같은 Iterator로 구현된 모든 객체를 ObservableSource로 변환하여 아이템을 순서대로 발행
fromFuture() Future 인터페이스를 지원하는 모든 객체를 ObservableSource로 변환하고 Future.get() 메서드를 호출한 값으로 반환
fromPublisher() Publisher를 Observable로 변환
fromCallable() Callable를 Observable로 변환

 

fromArray(), fromIterator()

String[] array = new String[]{"ab","cd","ef"};
Observable source = Observable.fromArray(array);
source.subscribe(System.out::println());

//실행 결과
ab
cd
ef


ArrayList<String> list = new ArrayList();
list.add("ab");
list.add("cd");
Observable source = Observable.fromIterator(list);
source.subscribe(System.out::println())

//실행 결과
ab
cd

 

fromFuture()

Future 인터페이스는 비동기적 작업의 결과를 구할 때 사용한다. 

Future<String> future = Executors.newSingleThreadExecutor()
    .submit(()->{
        Thread.sleep(5000);
        return "This is the Future";
    });
Observable source = Observable.fromFuture(future);
source.subscribe(System.out::println());

//실행 결과
This is Future

위는 단순히 출력된 것 처럼 보이지만 5초간 블로킹되어 기다린 후 출력된 것이다. 위에선 Executor를 사용했지만 Rx에서는 스케쥴러를 사용하는 것을 권장한다.

 

fromPublisher()

Publisher는 잠재적인 아이템 발행을 제공하는 생산자로 Subscriber로부터 요청을 받아 아이템을 발행한다. 

Publisher<String> publisher = subscriber -> {
	subscriber.onNext("A");
    subscriber.onNext("B");
    subscriber.onComplete();
};

Observable<String> source = Observable.fromPublisher(publisher);
source.subscribe(System.out::println());

//실행 결과
A
B

 

fromCallable()

Callable은 비동기적인 실행 결과를 반환한다. 

Callable<String> callable = () -> "Rx is fucking cool";
Observable source = Observable.fromCallable(callable);
source.subscribe(System.out::println());

//실행 결과
Rx is fucking cool

 

다양한 Observable의 형태

위에서 소개한 Observable 스트림 외에도 Single, Maybe, Completable 처럼 특별한 스트림이 있다.

 

Single

Single은 단 하나의 아이테믈 발행할 수 있다. 이러한 특징 때문에 HTTP 요청/응답에서 자주 사용한다. create()를 사용하는 경우 emitter를 통해 데이터를 발행하고 onSuccess()를 통해 데이터 발행과 동시에 완료를 알린다. 오류는 onError()를 똑같이 사용하면 된다.

Single.create(emitter -> emitter.onSuccess("Hello"))
	.subscribe(System.out::println())


//실행결과
Hello

 

Maybe

Maybe는 Single과 비슷하지만, 아이템을 발행하거나 발행하지 않을 수도 있다는 점에서 차이가 난다. 아이템을 발행한 경우에는 Single과 같이 onSuccess()를 호출하고, 발행하지 않았을 경우에는 onComplete()를 호출한다.

Maybe.create(emitter -> {
    emitter.onSuccess(100);
    emitter.onComplete();//영향없음:무시됨
})
.doOnSuccess(item -> System.out.println("doOnSuccess"))
.doOnComplete(() -> System.out.println("doOnComplete"))
.subscribe(System.out::println());

//실행 결과
doOnSuccess
100


Maybe.create(emitter -> {
    emitter.onComplete();
})
.doOnSuccess(item -> System.out.println("doOnSuccess"))
.doOnComplete(() -> System.out.println("doOnComplete"))
.subscribe(System.out::println());

//실행 결과
doOnComplete

 

 

Completable

Completable은 아이템을 발행하지 않고 정상적으로 실행이 종료되었는지 확인할 때 사용한다. 따라서, onComplete()onError()만을 사용한다.

Completable.create(emitter -> {
	System.out.println("OK");
    emiiter.onComplete();
}).subscribe(()->System.out.println("complete"));

//실행결과
OK
complete

 

 

Reference

  • 아키텍처를 알아야 앱 개발이 보인다
  • 커니의 코틀린

'안드로이드 > RxAndroid(RxJava)' 카테고리의 다른 글

[RxJava] Scheduler  (0) 2020.12.25
[RxJava] Disposable  (0) 2020.12.22
[RxJava] Cold Observable과 Hot Observable  (0) 2020.12.22
[RxJava] 반응형 프로그래밍  (0) 2020.12.21