ConnectableObservale Example

In this post under RxJava I will explain with example what is a ConnectableObservale.

They are two kinds of Observables. One is called Hot and another Cold. Whatever Observables you have seen in my previous posts are all Cold. You will see the difference
between them when more than one Observer subscribes to the Observable.

Whenever an Observer subscribes to an Observable, a stream is created. The stream can be shared between all the Observers or each Observer will have their own specific

In case of Cold Observable a unshared stream is created for all the subscribed Observers regardless of when they subscribed. As a result each Observer will get its own copy of the items emitted by the Observable.

In case of Hot Observable a shared stream is created for all the subscribed Observers. As a result each Observer will get single copy of the items emitted by the Observable. If the Observers have subscribed at the same, then all Observers will get all the items emitted by the Observables but if there is gap between the subscription of each Observer. Some Observers may miss some items.

For example if O is an Observable emitting items at an interval of 1 seconds between each emission and two Observers O1 and O2 subscribes to O and their is delay between their subscription of 3 seconds. O1 will receive 1, 2, 3, 4, 5, etc whereas O2 will receive 4, 5, etc.

ConnectableObservale is an Hot Observable. We usually create a ConnectableObservale from a Cold Observable. Below is an example of it.

Main Class

1  import java.util.concurrent.TimeUnit;
3  import io.reactivex.rxjava3.core.Observable;
4  import io.reactivex.rxjava3.observables.ConnectableObservable;
6  public class Example87 {
7      public static void main(String[] args) throws Exception {
8          Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
9          ConnectableObservable<Long> connectableObservable = observable.publish();
10         connectableObservable.connect();
12         connectableObservable.subscribe((t) -> System.out.println("Observer1: " + t));
13         Thread.sleep(1000);
14         connectableObservable.subscribe((t) -> System.out.println("Observer2: " + t));
15         Thread.sleep(2000);
16     }
17 }

In the above code, at line 8, we create an Observable instance “observable”, which will emit items every seconds. This is an Cold Observable.

At line 9, we convert the Cold Observable to Hot Observable (i.e., ConnectableObservable) instance “connectableObservable”, by calling “publish” method on the Observable instance “observable”.

At line 10, we call “connect” method on the “connectableObservable” instance. Unless we call “connect” method, the observable will not start emitting the items.

At line 12, we subscribe an Observer to the “connectableObservable”

After a gap of 1 second, at line 14, we subscribe another Observer to the “connectableObservable”.

As a result, first Observer will get 1, 2, 3, etc and the second Observer will miss the item 1 but receive 2, 3, 4, etc.

Below is the output


Observer1: 0
Observer1: 1
Observer2: 1
Observer1: 2
Observer2: 2

As you can see from the output Observer2 misses item 0.

Leave a Reply