Reactive programming with RxJava
Reactive programming with RxJava
Reactive programming takes the functional paradigm and layers on sophisticated programming-in-the-large capabilities. These capabilities allow for using functional-like semantics in application architectures. ReactiveX is one of the strongest projects in the reactive world, providing a set of common specifications for language implementers. This article is a hands-on exploration of RxJava, the Java implementation of ReactiveX.
Getting started with RxJava
To test out RxJava, we’ll write a command-line application that watches the public event stream developed by CoinCap. This event stream provides a WebSocket API, which is like a firehouse of JSON-formatted events for every transaction on a wide range of crypto exchanges. We’ll begin by simply grabbing these events and printing them to the console. Then we’ll add in some more sophisticated handling to show off RxJava’s capabilities.
Listing 1 gets us started with the Maven quickstart archetype, which provides the scaffolding for our demo application.
Listing 1. The Maven quickstart
mvn archetype:generate -DgroupId=com.infoworld -DartifactId=rxjava -DarchetypeArtifactId=maven-archetype-quickstart
Now we have a simple project scaffold stored in the /rxjava
directory. We can modify the pom.xml
to include the dependencies we need. We also set the Java version of the program, as shown in Listing 2.
Listing 2. Modified pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.infoworld</groupId> <artifactId>rxjava</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>rxjava</name> <url>http://maven.apache.org</url> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>16</source> <target>16</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>2.2.21</version> </dependency> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>4.9.1</version> </dependency> <!-- JSON library for parsing GitHub API response --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.9</version> </dependency> </dependencies> </project>
To confirm that things are working, type: $ mvn clean install exec:java -Dexec.mainClass="com.infoworld.App"
. This command should result in the classic “Hello World” output.
Now, we’ll add the code for the basic feature of pulling events from the WebSocket endpoint and displaying them in the console. You can see this code in Listing 3.
Listing 3. Adding a feature
package com.infoworld; import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.disposables.Disposables; import io.reactivex.schedulers.Schedulers; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.WebSocket; import okhttp3.WebSocketListener; import okio.ByteString; public class App { public static void main(String[] args) { String websocketUrl = "wss://ws.coincap.io/trades/binance"; OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder().url(websocketUrl).build(); Observable<String> observable = Observable.create(emitter -> { WebSocket webSocket = client.newWebSocket(request, new WebSocketListener() { @Override public void onOpen(WebSocket webSocket, okhttp3.Response response) { // WebSocket connection is open } @Override public void onMessage(WebSocket webSocket, String text) { emitter.onNext(text); // Emit received message } @Override public void onMessage(WebSocket webSocket, ByteString bytes) { // Handle binary message if needed } @Override public void onClosing(WebSocket webSocket, int code, String reason) { webSocket.close(code, reason); } @Override public void onClosed(WebSocket webSocket, int code, String reason) { emitter.onComplete(); // WebSocket connection is closed } @Override public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) { emitter.onError(t); // WebSocket connection failure } }); // Dispose WebSocket connection when the observer is disposed emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket"))); }); observable .subscribeOn(Schedulers.io()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { // No-op } @Override public void onNext(String event) { // Process each event here System.out.println(event); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("Completed"); } }); // Wait indefinitely (or use another mechanism to keep the program running) try { Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } }
If you run this program, you’ll get a line-by-line output of JSON events, with one event per line. To kill it, hit Ctrl/Command-c.
Modeling event streams
Listing 3 gives us a good look at some RxJava fundamentals. We obtain a connection to the binance push endpoint (wss://ws.coincap.io/trades/binance) with the OkHttpClient
, which makes it easy to consume the WebSocket API. (See the OkHttpClient documentation.)
Once we have the connection open, we create a new Observable
. An Observable
is the basic type for emitting events, an object that can be watched (or listened to). In other words, an Observable
is an event source of some kind, and it can model many different sources. In this case, we’re creating a new source with the Observables.create
method, which is a higher-order function accepting a function with a single argument, which we name emitter
.
The emitter
object has all the callback methods we need in order to produce our event stream. In a sense, we want to wrap the WebSocket stream in a custom RxJava event source. To do this, we take the callbacks we want from WebSocketClient
—in particular, the String
version of onMessage
—and call the emitter
method we want, in this case, emitter.onNext(text);
. (There are also callbacks for life cycle events like onClosed
and onError
.)
What this gives us is an Observable
that can be handed around to whoever needs it in order to be informed of what’s going on. This is a standardized, portable way to model event streams. Moreover, it’s highly malleable, with a range of functional transformations, which you’ll see momentarily.
Here’s how we close the emitter:
emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket"))); });.
Closing the emitter this way ensures that we close the WebSocket connection when the emitter is done.
Observing events
To observe events coming off the Observable
, we use the subscribe
method on the Observable
object. We first call .subscribeOn(Schedulers.io())
, which tells RxJava to run in a background thread. This is a (very) easy way to get multithreaded concurrency. RxJava even uses a thread pool for you.
The main work of handling the events is done by passing in an Observer
to the subscribe
method. The Observer
class is the other side of the coin to Observable
: the basic type of anything that wants to watch for events. In this case, we create a new anonymous Observer
(parameterized with a <String>
generic) inline in the subscribe()
call. The actual job of writing the event to the console happens in the onNext(String)
method of the Observer
.
Manipulating event streams
Now let’s perform a couple of operations on the stream. First, we’ll use GSON to transform the String
into a JSON object. Then, we’ll use the object to filter out only transactions that are on the Solana blockchain.
To do this, we can use the map()
and filter()
methods on the Observable
class. With map()
, we can transform the strings on an event-by-event basis into JSON objects. Then, we use the JSON inside the filter()
method to only keep those events with “Solana” as the currency (in the CoinCap spec, the crypto being used is in the “base” field). You can see this new code in Listing 4.
Listing 4. Using map() and filter()
import com.google.gson.Gson; import com.google.gson.JsonObject; //… The rest is the same observable .subscribeOn(Schedulers.io()) .map(event -> { Gson gson = new Gson(); JsonObject jsonObject = gson.fromJson(event, JsonObject.class); return jsonObject; }) .filter(jsonObject -> { String base = jsonObject.get("base").getAsString(); return base.equals("solana"); }) .subscribe( jsonObject -> System.out.println(jsonObject), Throwable::printStackTrace, () -> System.out.println("Completed") );
The map
and filter
calls are fairly easy to read. map()
turns our String
stream into a JsonObject
stream. filter()
takes the JsonObject
s as they arrive. It only keeps the ones with a base field equal to “solana”.
Listing 4 also shows us a different overload of the subscribe()
method. Instead of an Observer
instance, this one takes three arguments: the onNext
, onError
, and onComplete
functions. It works the same. There is also a single-argument version that just takes the onNext
handler.
Also, notice that map
and filter
are the same functional-style operations we know and love from Java streams and other languages like JavaScript. But now, we can apply them to a wide range of event sources. In fact, we can apply these operations to anything that can be handled with Observer
s and Observable
s.
Conclusion
Reactive programming in RxJava puts some serious power in your hands, in a flexible syntax. It can be used in a variety of circumstances. As we’ve seen, it’s quite handy in dealing with streaming data sources like the CoinCap API. The ability to pass around streams of events as objects is an important abstraction in modern software. Every developer should know about it. You can find the full source for the example application on GitHub.