Creating a Custom Cold Stateless Observable

In this post under RxJava, I will show with example how to create your own custom cold stateless Observable using “generate” method.

The custom cold stateless Observable will not hold any state information that’s why I said stateless in the previous statement.

Below is the syntax of generate method

Syntax


public static <T> @NonNull Observable<T> generate(@NonNull Consumer<Emitter<T>> generator)

The generate method takes an instance of io.reactivex.rxjava3.functions.Consumer interface. Below is an implementation of the interface, for our example


1  class MyConsumer implements Consumer<Emitter<Integer>> {
2      private int min;
3      private int max;
4      
5      public MyConsumer(int min, int max) {
6          this.min = min;
7          this.max = max;
8      }
9      
10     @Override
11     public void accept(@NonNull Emitter<Integer> emitter) throws Throwable {
12         int value = ThreadLocalRandom.current().nextInt(min, max + 1);
13         if(value == max) {
14             emitter.onComplete();
15         } else {
16             emitter.onNext(value);
17         }
18     }
19 }

The above MyConsumer class takes min and max integers as constructor arguments and implement the accept method. The accept method takes Emitter instance as an argument.

In the accept method we generate random number between min and max integers and pass it to emitter, which in this case can be Observer instance or an Operator in the chain of operators.

Below is the main class, which shows how to use generate method to create an Observable instance.

Main class


1 public class Example7 {
2     public static void main(String[] args) {
3         MyConsumer myConsumer = new MyConsumer(1, 10);
4         Observable<Integer> observable = Observable.generate(myConsumer);
5         observable.subscribe(t -> System.out.println(t), t -> System.out.println(t.getMessage()));
6     }
7 }

In the above main class, at line 4 We are creating an Observable which generates a stream of random numbers between 1 and 10.

The number 1 and 10 are passed as constructor arguments to myConsumer instance created at line 3.

At line 4, when calling generate method, we are passing the myConsumer instance as an argument.

At line 5, we subscribe to Observable instance (created at 4) by passing lambda expression for onNext event and onComplete Event. Internally, an Observer instance is created using the two lambda expression and subscribed to the Observable instance.

Below is the output

Output

8
4
3
1
9
9
2

Leave a Reply