ConnectableObservable autoConnect method

In this post under RxJava, I will explain with example the purpose of autoConnect method in ConnectableObservable class.

We can use the autoConnect method to inform in advance how many observers will subscribe to the ConnectableObservable. Once the mentioned number of observers subscribe to the ConnectableObservable, the observable will behave like an cold observable and start emitting items.

After this if more number of Observers subscribe to the ConnectableObservable, it starts behaving like an hot observable to them.

For example lets say we have an ConnectableObservable that emits long with a gap of 1 second between each emission. If we say in advance that two Observers will subscribe to this ConnectableObservable. The ConnectableObservable will wait for the two observers to subscribe. Once the two observers subscribe the ConnectableObservable will start emitting items to them. For these two Observers ConnectableObservable will behave like a cold observable. If a third Observer subscribes, for this newly subscribed Observer, the ConnectableObservable will behave like a hot observable.

Below is an example showing its usage

Main Class


1 import java.util.concurrent.TimeUnit;
2 
3 import io.reactivex.rxjava3.core.Observable;
4 
5 public class Example12 {
6     public static void main(String[] args) throws Exception {
7         Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
8         observable = observable.publish().autoConnect(2);
9         
10        System.out.println("Observer 1 subscribed");
11        observable.subscribe((t) -> System.out.println("Observer1: " + t));
12        Thread.sleep(10000);
13        System.out.println("Observer 2 subscribed");
14        observable.subscribe((t) -> System.out.println("Observer2: " + t));
15        Thread.sleep(10000);
16        System.out.println("Observer 3 subscribed");
17        observable.subscribe((t) -> System.out.println("Observer3: " + t));
18        Thread.sleep(10000);
19    }
20}

In the above code, at line 8 we call “autoConnect” method with an argument 2. In advance we are saying two observers will subscribe.

The ConnectableObservable will wait for the two observers to subscribe.

At line 11, first observer subscribes and at line 14, second observer subscribes.

From this movement on emission starts and each observer will get a copy of the item emitted.

At line 17, third observer subscribes, from this point onwards, the third observer will start getting latest emitted items and not the previously emitted items.

Below is the output

Output

Observer 1 subscribed
Observer 2 subscribed
Observer1: 0
Observer2: 0
Observer1: 1
Observer2: 1
Observer1: 2
Observer2: 2
Observer1: 3
Observer2: 3
Observer 3 subscribed
Observer1: 4
Observer2: 4
Observer3: 4
Observer1: 5
Observer2: 5
Observer3: 5
Observer1: 6
Observer2: 6
Observer3: 6
Observer1: 7
Observer2: 7
Observer3: 7
Observer1: 8
Observer2: 8
Observer3: 8
Observer1: 9
Observer2: 9
Observer3: 9

If you see the output emission starts after two observers subscribe to the observable.

Third observer subscribe after a delay, as a result it gets the latest item which is 4 and not the previously emitted items 0, 1, 2, 3.

Leave a Reply