Creating custom logic for Disposable

In this post under RxJava, I will show with example how to add custom logic when a subscription between Observable and Observer is being disposed.

You can add custom logic in Disposable only if you are creating an Observable using callback style.

In my previous post under RxJava I have shown you how to create custom Observable using callback style.

We will use similar example here also.

We will create a custom Observable implementing ObservableOnSubscribe interface, as shown below


1  class MyObservableOnSubscribe2 implements ObservableOnSubscribe<Integer> {
2      @Override
3      public void subscribe(@NonNull ObservableEmitter<@NonNull Integer> observableEmitter) throws Throwable {
4          for(int i = 0; i < 10; i++) {
5              observableEmitter.onNext(i);
6          }
7          Disposable disposable = Disposable.fromRunnable(() -> {System.out.println("Done");});
8          observableEmitter.setDisposable(disposable);
9          observableEmitter.onComplete();
10     }
11 }

In the above code, at line 7 we create a Disposable instance using “fromRunnable” static method. This method takes a lambda as an argument and this lambda code is executed when subscription is disposed of.

At line 8, we pass the Disposable instance created at line 7 to an instance of ObservableEmitter by calling “setDisposable” method.

This ObservableEmitter instance passes the Disposable instance to the Observer when subscribed.

Below is the main code which shows how to use the above custom Observable.

Main Class


1  import io.reactivex.rxjava3.annotations.NonNull;
2  import io.reactivex.rxjava3.core.Observable;
3  import io.reactivex.rxjava3.core.ObservableEmitter;
4  import io.reactivex.rxjava3.core.ObservableOnSubscribe;
5  import io.reactivex.rxjava3.disposables.Disposable;
6  
7  public class Example20 {
8      public static void main(String[] args) {
9          MyObservableOnSubscribe2 myObservableOnSubscribe2 = new MyObservableOnSubscribe2();
10         Observable.<Integer>create(myObservableOnSubscribe2).subscribe(i -> System.out.println(i));
11     }
12 }

In the above code, at line 9 we created an instance of MyObservableOnSubscribe2 named “myObservableOnSubscribe2”.

At line 10, we create an Observable and pass a lambda expression to “subscribe” method. This code will create an Observable and execute the code present in MyObservableOnSubscribe2’s
“subscribe” method. When ObservableEmitter’s onComplete is called the Disposable instance (created at line 7 in MyObservableOnSubscribe2 class) is called and “Done” text is printed in the console.

Below is the output

Output

0
1
2
3
4
5
6
7
8
9
Done

Leave a Reply