Creating and Disposing an Observer Manually

In this post under RxJava, I will explain how to create and dispose an Observer manually.

Till now in all my previous posts under RxJava, I was not creating and disposing an Observer manually. The Observer was created internally by RxJava. All I was doing was providing callback functions for onNext,
onComplete, and onError events as shown below.


observable.subscribe(t -> System.out.println(t), t -> t.printStackTrace(), () -> System.out.println("Complete"));

In the above code, the subscribe method we used, takes three callback lambda functions as arguments to subscribe method. The first argument is for onNext event, second for onError event, and the third one for onComplete event. The RxJava was taking these callback functions and create an Observer containing these callback functions.

Now to create an Observer manually, we need to implement an “Observer” interface as shown below


class MyObserver implements Observer<Integer> {
    private Disposable disposable;

    @Override
    public void onComplete() {
        System.out.println("Complete");
        disposable.dispose();
    }

    @Override
    public void onError(@NonNull Throwable error) {
        disposable.dispose();
        error.printStackTrace();
    }

    @Override
    public void onNext(@NonNull Integer data) {
        System.out.println(data);
    }

    @Override
    public void onSubscribe(@NonNull Disposable disposable) {
        this.disposable = disposable;
    }
}

In the above class we provide logic for onComplete, onError, onNext, and onSubscribe method.

The “onSubscribe” method is called when the observer subscribes to the observable. The method receives “disposable” object as an argument, this “disposable” object is used to end the subscription between the observer and observable. In this code we are storing the reference to disposable object using MyObserver’s private variable “disposable”.

The “onComplete” method is called when observable is done emitting items.

The “onError” method is called when an error happens.

The “onNext” method is called when observable emits an item.

Next we subscribe this custom observer “MyObserver” as shown below


    Observable<Integer> observable = Observable.range(1, 10);
    observable.subscribe(myObserver);

Below is the main class for your reference


import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;

public class Example17 {
    public static void main(String[] args) {
        MyObserver myObserver = new MyObserver();
        Observable<Integer> observable = Observable.range(1, 10);
        observable.subscribe(myObserver);
    }
}

Leave a Reply