Creating Observable from Runnable, Callable, and Action

In this post under RxJava, I will explain with example how to create Observable from Runnable, Callable, and Action interfaces.

When creating Observable from Runnable interface, the Observable will execute the code inside Runnable and either throw an error or just completes.

When creating Observable from Callable interface, the Observable will execute the code inside Callable and either emit an item and complete or throw an error.

When creating Observable from Action interface, the Observable will execute the code inside Action and either throw an error or just completes.

For our example, we will create below classes which will implement Runnable, Callable, and Action interfaces.

Interface Implementation Classes


class MyRunnable implements Runnable {
    private boolean throwError;

    public MyRunnable(boolean throwError) {
        this.throwError = throwError;
    }

    @Override
    public void run() {
        if(throwError) {
            throw new NullPointerException("Null Value");
        }
    }
}

class MyCallable implements Callable<String> {
    int i = 0;

    @Override
    public String call() throws Exception {
        return "Hello";
    }
}

class MyAction implements Action {
    private boolean throwError;

    public MyAction(boolean throwError) {
        this.throwError = throwError;
    }

    @Override
    public void run() throws Throwable {
        if(throwError) {
            throw new NullPointerException("Null Value");
        }
    }
}

So in the above code, MyRunnable and MyAction classes takes a boolean value “throwError” as constructor argument and if the value of “throwError” is true it throws
an Exception or just completes execution.

MyCallable implementation class will just return “Hello”

Below is the main class using the above classes

Main Class


1  import java.util.concurrent.Callable;
2  
3  import io.reactivex.rxjava3.core.Observable;
4  import io.reactivex.rxjava3.functions.Action;
5  
6  public class Example10 {
7      public static void main(String[] args) {
8          Observable<Void> observable1 = Observable.fromRunnable(new MyRunnable(false));
9          observable1.subscribe(t -> System.out.println("Observable1: " + t), t -> System.out.println("Observable1: " + t.getMessage()), () -> System.out.println("Observable1: Complete"));
10         
11         observable1 = Observable.fromRunnable(new MyRunnable(true));
12         observable1.subscribe(t -> System.out.println("Observable1: " + t), t -> System.out.println("Observable1: " + t.getMessage()), () -> System.out.println("Observable1: Complete"));
13         
14         Observable<String> observable2 = Observable.fromCallable(new MyCallable());
15         observable2.subscribe(t -> System.out.println("Observable2: " + t), t -> System.out.println("Observable2: " + t.getMessage()), () -> System.out.println("Observable2: Complete"));
16         
17         Observable<Void> observable3 = Observable.fromAction(new MyAction(false));
18         observable3.subscribe(t -> System.out.println("Observable3: " + t), t -> System.out.println("Observable3: " + t.getMessage()), () -> System.out.println("Observable3: Complete"));
19         
20         observable3 = Observable.fromAction(new MyAction(true));
21         observable3.subscribe(t -> System.out.println("Observable3: " + t), t -> System.out.println("Observable3: " + t.getMessage()), () -> System.out.println("Observable3: Complete"));
22     }
23 }

Below is the output generated

Output

Observable1: Complete
Observable1: Null Value
Observable2: Hello
Observable2: Complete
Observable3: Complete
Observable3: Null Value

In the above code, at line 8, we are creating an observable which will execute the MyRunnable code (basically does nothing) and prints “Complete”.

At line 11, we are creating an observable which will throw an exception and the observer prints the exception message.

At line 14, we are creating an observable which will execute the MyCallable code (in this return “Hello” string), which will be printed by the observer and
then prints “Complete”

At line 17, we are creating an observable which will execute the MyAction code (basically does nothing) and prints “Complete”.

At line 20, we are creating an observable which will throws an exception and the observer prints the exception message.

In this way, we can create an Observable from Runnable, Callable, and Action interface.

Leave a Reply