Using CompositeDisposable

In this post under RxJava, I will show with an example the purpose and how to use the CompositeDisposable class.

When you have multiple subscriptions like shown below, we get a multiple Disposable object one for each subscription.


    Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS);
    Disposable disposable1 = observable1.subscribe((t) -> System.out.println("Observable1: " + t));

    Observable<Long> observable2 = Observable.interval(2000, TimeUnit.MILLISECONDS);
    Disposable disposable2 = observable2.subscribe((t) -> System.out.println("Observable2: " + t));

You have to end the subscription one by one by calling “dispose” method on each Disposable object in this case “disposable1” and “disposable2” as shown below


    disposable1.dispose();
    disposable2.dispose();

If they are more than one subscription we may miss disposing some of them.

To avoid the above problem from manifesting, we can use CompositeDisposable object.

It is an object that holds references to all the Disposable objects and when we have to dispose all the Disposable objects, we just have to call “dispose” method on the CompositeDisposable instance as shown in the below main code

Main Class


1  import java.util.concurrent.TimeUnit;
2  
3  import io.reactivex.rxjava3.core.Observable;
4  import io.reactivex.rxjava3.disposables.CompositeDisposable;
5  import io.reactivex.rxjava3.disposables.Disposable;
6  
7  public class Example19 {
8      public static void main(String[] args) throws Exception {
9          Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS);
10         Disposable disposable1 = observable1.subscribe((t) -> System.out.println("Observable1: " + t));
11         
12         Observable<Long> observable2 = Observable.interval(2000, TimeUnit.MILLISECONDS);
13         Disposable disposable2 = observable2.subscribe((t) -> System.out.println("Observable2: " + t));
14         
15         Thread.sleep(10000);
16         
17         CompositeDisposable compositeDisposable = new CompositeDisposable(disposable1, disposable2);
18         compositeDisposable.dispose();
19         System.out.println("Subscriptions are disposed. Wait for 10 sec and see whether there is any output from the obervables");
20         Thread.sleep(10000);
21         System.out.println("Terminating the program");
22     }
23 }

In the above code, we create two Observable instances at line 9 and 12.

Then at line 10 and 13, we subscribe two observers to the two Observable instances, which will return two Disposable objects “disposable1” and “disposable2”.

Where “disposable1” holds a reference to the subscription between “observable1” and its observer.

Where “disposable2” holds a reference to the subscription between “observable2” and its observer.

Now at line 17, we create CompositeDisposable object named “compositeDisposable” by passing the two Disposable objects as constructor arguments.

Now the instance “compositeDisposable” will have reference to these two Disposable objects.

At line 18, we dispose all the subscription by calling “dispose” method on the “compositeDisposable” instance.

Internally the “compositeDisposable” object will call dipose on each “Disposable” object and end each subscription.

In this way we can dispose multiple Disposable objects with one line of code.

Below is the output

Output

Observable1: 0
Observable1: 1
Observable2: 0
Observable1: 2
Observable2: 1
Observable1: 3
Observable1: 4
Observable1: 5
Observable2: 2
Observable1: 6
Observable1: 7
Observable2: 3
Observable1: 8
Observable2: 4
Observable1: 9
Subscriptions are disposed. Wait for 10 sec and see whether there is any output from the obervables
Terminating the program

Leave a Reply