Reactive Programming: JavaScript vs Java

Reactive Programming: JavaScript vs Java

In this article, I will try to explain reactive programming concepts and compare their implementations in JavaScript and Java.

Reactive programming is based on the idea of asynchronous data streams. A stream is a sequence of ongoing events ordered in time. Streams can be created from anything: HTTP calls, click events, timers, cache events, and anything you can think of. The stream can emit three different things: a value, an error, or a "completed" signal. You can listen to the stream and react asynchronously by defining the callback methods that will be called for 3 different scenarios:

  1. Value is emitted.
  2. Error is emitted.
  3. Complete signal is emitted.

This process of listening to the stream is called subscribing. Methods we are passing are observers. The stream is the subject (or "observable") being observed.

Most of the modern imperative languages use ReactiveX, or Rx, for implementing reactive programming principles. Rx is an API that was built for imperative languages to better handle asynchronous data. Rx was developed by Microsoft and later open-sourced. The initial implementation of Rx was for .NET Framework and later it was ported to other languages and platforms.

Rx uses a combination of the Iterator pattern and the Observer pattern from Gang of Four design patterns. Also, it is combined with functional programming paradigms. Each stream has many functions attached to it, called operators. Some of the operators are map, filter, scan, etc. Operators always return a new stream, without modifying the original stream they were called on. This concept is called immutability.

Reactive programming in JavaScript

RxJS represents Javascript implementation of ReactiveX API. RxJS is extensively used in Angular and can be also used in other JS frameworks.

Let's take a look at a simple example of using RxJS to fetch data from HTTP resource.

import { from } from 'rxjs';

const data = from(fetch('/api/endpoint'));
data.subscribe({
  next(response) { console.log(response); },
  error(err) { console.error('Error: ' + err); },
  complete() { console.log('Completed'); }
});
console.log('Last line');

In the example above we are using Fetch API to call an HTTP endpoint. Because fetch() method returns a promise, we will use from operator to create an observable out of a promise. After that, we subscribe to the observable and listen to the result of the request that will be returned as a stream. In this case, with one single emitted value. Then, we pass the callback methods that will be called for new returned values, errors, or observable completion.

What's important here is to understand what is happening behind the scenes. As we know, JavaScript is single-threaded and uses non-blocking I/O calls. This means that while I/O request is handled, a thread can handle other requests as well, with no waiting for the implementation of each. To provide concurrency, JavaScript uses event loop and callbacks. Since JavaScript is a synchronous language by design, all of the asynchronous functionality is implemented in external libraries. This includes network requests, file system access, setTimeout(), and other timing events. Once the asynchronous calls are done, the event loop pushes their corresponding callbacks in the execution stack.

In our RxJS example, fetch is making an asynchronous call over the network. This means that this asynchronous call will be wrapped inside an observable and it will be emitting asynchronously. But if we wrap synchronous code inside an observable, it will be emitting synchronously. In the next example, we are reading from an array and emitting values synchronously.

import { from } from 'rxjs'; 

const array = from([1, 2, 3]);
array.subscribe(i => console.log(i));
console.log('Last line');

If we run our examples, we can see the difference between synchronous and asynchronous code execution. In the second example, the code will run as expected:

1
2
3
Last line

After we subscribe to the array observable, it is executed. It will emit 3 members of the array synchronously. Observer callbacks are called immediately and placed to the execution stack where they are executed. This means that the observer callback will be called three times and the values emitted will be printed out. In the end, our message is also printed out.

In the case of the first example, code will behave differently:

Last line
Response of the network call
Completed

This is because we are making a network request asynchronously and the observer callback is pushed to the callback queue. Hence, our print message in the last line will be executed first. Observer callback will be called after the network request is complete and it will print out the results.

Reactive programming in Java

Rx in Java influenced Reactive Streams. It is a specification that is used to provide a standard for asynchronous stream processing. Implementations of this specification include RxJava, Reactor, Akka Streams, Elasticsearch, and Apache Kafka. The Reactive Streams API consists of four core components:

  1. Publisher: Equivalent to Observable in Rx. They can have different names, depending on the reactive library and based on how many values they emit. Examples are Mono(one data item) and Flux(multiple data items).
  2. Subscriber: Equivalent to Subscriber in Rx.
  3. Subscription: Equivalent to Subscription in Rx.
  4. Processor: Same as Subject in Rx, except that it comes with backpressure management.

Unlike JavaScript, Java uses a blocking way of communication. Traditional servlet containers, such as Tomcat, are using a thread per request model. This means that the servlet container has a dedicated thread pool to handle the HTTP requests. For every incoming request, a new thread will be assigned and that thread will handle the entire lifecycle of the request. Since every thread requires some amount of memory, using this model could become quite costly for applications with a high number of concurrent requests. Such applications should benefit from using reactive programming in Java.

Reactive frameworks in Java can use different approaches to implement a non-blocking model. One of the most used frameworks is Spring WebFlux. WebFlux is using an event loop with a small, fixed-size thread pool (event loop workers) to handle the request. These worker threads are responsible for performing long-running tasks(database operations, file read/write, high CPU intensive work, etc). After the task is completed, worker threads return results asynchronously via a callback. On a "vanilla" Spring WebFlux server (e.g. no data access, nor other optional dependencies), you can expect one thread for the server, and several others for request processing (typically as many as the number of CPU cores).

This is a simple example of using WebFlux and making network requests:

private Stream<City> prepareStream() {
    return Stream.of(
        new City(1, "City1"),
        new City(2, "City2"),
        new City(3, "City3")
    );
}

@GetMapping("/api/{param}")
public Flux<City> findCities(@PathVariable("param") String param) {
    return Flux.fromStream(this::prepareStream).log()
        .mergeWith(
        client.get().uri("/slow/" + param)
            .retrieve()
            .bodyToFlux(City.class)
            .log()
        );
}

In the example, we are using WebClient, which is a non-blocking, reactive client, to perform HTTP requests. We will be using it to call some slow-performing API. One thing worth mentioning is that worker threads are shared between both server processing and client-side communication with other resources.

If you were to run this example, you would get an output similar to this:

23:02:39.495 --- [or-http-epoll-2] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
23:02:39.496 --- [or-http-epoll-2] : | request(32)
23:02:39.496 --- [or-http-epoll-2] : | onNext(City(id=1, name=Name01))
23:02:39.503 --- [or-http-epoll-2] : | onNext(City(id=2, name=Name02))
23:02:39.503 --- [or-http-epoll-2] : | onNext(City(id=3, name=Name03))
23:02:39.503 --- [or-http-epoll-2] : | onComplete()
23:02:39.513 --- [or-http-epoll-2] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
23:02:39.513 --- [or-http-epoll-2] : request(32)
23:02:39.761 --- [or-http-epoll-3] : onNext(City(id=29560, name=Nametest))
23:02:39.762 --- [or-http-epoll-3] : onNext(City(id=59531, name=Nametest))
23:02:39.763 --- [or-http-epoll-3] : onNext(City(id=26166, name=Nametest))
23:02:39.764 --- [or-http-epoll-3] : onComplete()

In the output, it is visible that the request is handled in thread or-http-epoll-2. After that, the WebClient call is executed and it subscribes to the results in the same thread. However, the result is published from a different thread, or-http-epoll-3. Also, if you make consecutive calls to the controller method, you can observe from the logs that WebClient does not block the thread while waiting for a result from a calling resource. This is because you can see multiple onComplete calls without any onSubscribe call. This proves that these are non-blocking threads. They allow us to handle many requests using relatively small numbers of threads.

Conclusion

By comparing reactive programming in Java and JavaScript we can see that underlying concepts are very similar. Also, they are heavily influenced by Rx project. But it is important to understand differences in how reactive programming is used in these two languages. This will allow us to better understand reactive programming itself and what problem it is trying to solve.