Creating Custom Observable Using Callback Style

In this post under RxJava, I will show with example how to create a custom Observable using Callback style.

In Callback style, a code is executed when a specific event occurs. In our case, the event is Observer subscribing to Observable. When this event occurs the callback code is called.

For our example, we create the callback code by implementing the ObservableOnSubscribe functional interface as shown in the below code


1 class MyObservableOnSubscribe1 implements ObservableOnSubscribe<Integer> {
2     @Override
3     public void subscribe(@NonNull ObservableEmitter<@NonNull Integer> observableEmitter) throws Throwable {
4         for(int i = 0; i < 10; i++) {
5             observableEmitter.onNext(i);
6         }
7         observableEmitter.onComplete();
8     }
9 }

In the above code, we created a new class “MyObservableOnSubscribe1” which will implement the interface ObservableOnSubscribe and provide logic to its only one method “subscribe”.

ObservableEmitter will have a reference to the Observer which has subscribed to the Observable. So calling “onNext”, “onComplete” or “onError” is like calling similar methods on the Observer.

Next I will show the main class in which we use the above implementation.

Main Class


1  import io.reactivex.rxjava3.annotations.NonNull;
2  import io.reactivex.rxjava3.core.Observable;
3  import io.reactivex.rxjava3.core.ObservableEmitter;
4  import io.reactivex.rxjava3.core.ObservableOnSubscribe;
5  
6  public class Example13 {
7      public static void main(String[] args) {
8          MyObservableOnSubscribe1 myObservableOnSubscribe1 = new MyObservableOnSubscribe1();
9          
10         Observable.<Integer>create(myObservableOnSubscribe1).subscribe(i -> System.out.println(i));
11     }
12 }

In the above code, at line 8, we create an instance of “MyObservableOnSubscribe1” class.

At line 10, we call Observable class static method “create” that will take “myObservableOnSubscribe1” instance as a parameter and creates an Observable instance.

With the help of method chaining, we are calling “subscribe” on the Observable instance created by “create” method and pass a lambda expression which will be called by Observer’s “onNext” method.

In this way we can create an custom Observable using callback approach.

Leave a Reply