Creating a Custom Cold Stateful Observable

In this post under RxJava, I will show with example how to create your own custom cold stateful Observable using “generate” method.

The custom cold stateful Observable will hold state information that’s why I said stateful in the previous statement.

They are two versions of this method. Below are there syntax


1 -> public static <T,S> @NonNull Observable<T> generate(@NonNull Supplier<S> initialState, @NonNull BiConsumer<S,Emitter<T>> generator)

2 -> public static <T,S> @NonNull Observable<T> generate(@NonNull Supplier<S> initialState, @NonNull BiFunction<S,Emitter<T>,S> generator)

The first version creates an Observable instance which takes the initial value send by the caller, keeps it in memory and passed it to the generator as initial value every time a new value has to be generated.

Below is an example using the first version of generate method

Main Class


1  import java.util.concurrent.ThreadLocalRandom;
2  
3  import io.reactivex.rxjava3.annotations.NonNull;
4  import io.reactivex.rxjava3.core.Emitter;
5  import io.reactivex.rxjava3.core.Observable;
6  import io.reactivex.rxjava3.functions.BiConsumer;
7  
8  public class Example8 {
9      public static void main(String[] args) {
10         MyBiConsumer myBiConsumer = new MyBiConsumer(10);
11         Observable<Integer> observable1 = Observable.generate(() -> {return 0;}, myBiConsumer);
12         observable1.subscribe((t) -> System.out.println(t));
13     }
14 }
15 
16 class MyBiConsumer implements BiConsumer<Integer, Emitter<Integer>> {
17     private int max;
18     
19     public MyBiConsumer(int max) {
20         this.max = max;
21     }
22 
23     @Override
24     public void accept(@NonNull Integer t1, Emitter<Integer> t2) throws Throwable {
25         int min = t1;
26         int value = ThreadLocalRandom.current().nextInt(min, max + 1);
27         if(value == max) {
28             t2.onComplete();
29         } else {
30             t2.onNext(value);
31         }
32     }
33 }

As seen in the above code, at line 11, I call generate method passing a Supplier instance which returns 0 as initial value and BiConsumer interface implementation by name “MyBiConsumer”.

The max value is set during the construction of MyBiConsumer class so it doesn’t change. Refer to line 10

The min value is set by the value returned by the Supplier instance passed as argument to the generate method.

The Observable instance created at line 11, will generate random numbers between 0 and 10.

The initial value 0 is passed as value of argument t1 in accept method of MyBiConsumer class, whenever generate method is called.

The accept method generates a random number between min and max (which in this example is 0 and 10) and send it to the Observer.

The second version creates an Observable instance which takes the initial value send by the caller and then passes it to the generator as initial value. The generator generate a value and sends it to the Observer. The generator also returns a value which is used as initial value by the same generator when generating second value and so on.

Below is an example using the second version of generate method

Main Class


1  import io.reactivex.rxjava3.annotations.NonNull;
2  import io.reactivex.rxjava3.core.Emitter;
3  import io.reactivex.rxjava3.core.Observable;
4  import io.reactivex.rxjava3.functions.BiFunction;
5  
6  public class Example9 {
7      public static void main(String[] args) {
8          MyBiFunction myBiFunction = new MyBiFunction(10);
9          
10         Observable<Integer> observable1 = Observable.generate(() -> {return 5;}, myBiFunction);
11         observable1.subscribe((t) -> System.out.println(t));
12     }
13 }
14 
15 class MyBiFunction implements BiFunction<Integer, Emitter<Integer>, Integer> {
16     private int max;
17     
18     public MyBiFunction(int max) {
19         this.max = max;
20     }
21     
22     @Override
23     public @NonNull Integer apply(@NonNull Integer t1, @NonNull Emitter<Integer> emitter) throws Throwable {
24         if(t1 == max) {
25             emitter.onComplete();
26         } else {
27             emitter.onNext(t1);
28             t1 = t1 + 1;
29         }
30         return t1;
31     }
32 }

In the above code, at line 10, I call generate method and passing a Supplier instance which will return 5 and an implementation of BiFunction interface named MyBiFunction.

The Observable instance created at line 10 will generate numbers from 5 to 10.

In the apply method of class MyBiFunction, initial value 5 will be send to the Observer (refer to line 27), incremented and returned from the method as t1. Next time when generate is called, the apply method is again called and the returned value t1 from previous call will be used as inital value this time.

In these two ways we can create custom cold stateful observable.

Leave a Reply