Working with Micronaut, what I’ve been doing recently, it’s inevitable to stumble upon RxJava and Reactive Programming concepts, especially implementing some internal Micronaut interfaces usually returning Flowable<?>
instead of ?
. Having no previous experience but beign curious about this reactive hype I decided to delve into the subject and try to understand it deeper. Even though it turned out more difficult and tricky than I expected before, I think I’ve finally caught the idea. In this article I’ll try to introduce the newcomer to my way of thinking about the reactive programming step by step.
A small warning at the beginning. There are literally hundreds of articles, guides, tutorials and other kinds of material about Reactive Programming with RxJava which focus mostly on creating a Observable
or Flowable
, feeding it with data and then processing this data with everything made possible by these classes interfaces (and there’s a really lot of things there). Before you begin to read you need to be aware I don’t care about this. In my opinion this is not a crux of reactive programming and if you already know Java 8 streams, you already understand how it works and how it can be used. Instead, in this article I will try to explain how reactive streams really work and especially what problems try to solve.
Just after you start with this technology you will probably meet the same problem as me. RxJava was done in the version 1 using different interfaces and base classes, than RxJava 2. And now in RxJava 2 we have included both implementations, what is really confusing. The story of these changes is the following:
Observable
concept.Observable
from the original RxJava became Publisher
from Reactive Streams.This is why now RxJava contains both and Publisher
and Observable
- based API (the latter one is for backward compatibility). BTW, we can expect to have a lot more of fun with this in the future due to JDK 9 Flow :)
To find my path in this lib I sketched on the side the following picture:
RxJava 2 vs Reactive Streams
Previous RxJava 1 classes and interfaces are yellow, while the blue ones are interfaces provided by the Reactive Streams specification. The orange line shows which yellow concept is an equivalent for which blue one. Finally, the green boxes represent the new RxJava 2 implementation done using Reactive Streams interfaces. It’s also possible to convert from Flowable
to Observable
and backwards, what constitutes the bridge between the old RxJava 1 and the new RxJava 2.
If your system doesn’t need to be compatible with RxJava 1 you should probably start your implementation using green boxes. We will also focus on this color mostly in this article.
So, what’s the Reactive Streams and RxJava about? Before we answer this question let’s introduce some utility functions:
public static void log(Object o) {
System.out.println(String.format(Timer.elapsed() +
"ms [%s] %s", Thread.currentThread().getName(), o));
}
public static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
These two guys are going to be frequently used in this article. The log()
method logs some message to the console, at the same time showing the current thread name, while sleep()
just sleeps for some amount of milliseconds.
Now focus, because if it is something worthwhile remembering from this article, it’s this smart code snippet:
int i = 1;
log(i);
log("bye bye");
// 1ms [main] 1
// 1ms [main] bye bye
The result
1ms [main] 1
means that after1 ms
this program displayed1
on the console from themain
thread.
This code is very important because it does two things:
Moreover it does the above operations in the blocking way. So it first waits for the value, then displays it on the console and finally the main
thread finishes and the program exits.
Let’s now write it in a reactive way:
Flowable<Integer> publisher = Flowable.just(1);
publisher.subscribe(i -> log(i));
log("bye bye");
// 58ms [main] 1
// 58ms [main] bye bye
Nothing changes in the output but there are some important differences here:
Publisher
concept here (Flowable
from RxJava is Publisher
). The Publisher
is an object which is expected to provide some value (or multiple values).Subscriber
here. Our subscriber is a simple Consumer
which displays the value provided by the Publisher
on the console.Actually the Subscriber
says here: when the value will be ready, get the value from the Publisher
and display it on the console, while Publisher
doesn’t say anything about when this value will be provided. In the code above, when Subscriber
subscribes to the Publisher
the value is provided immediately, but as a matter of fact Subscriber
doesn’t know if the value is already there or if it will become available at some time in the future. So, from the Subscriber
’s point of view the Publisher
only promises to provide the value, but it is just not defined when.
This is why I like to think about
Publishers
in the same way I think about JavaScript Promises.
To illustrate this, let’s now modify our code so that the Publisher
provides the value some time later and in some different thread. This requires to add one line to our code:
Flowable<Integer> publisher = Flowable.just(1)
// this tells the publisher to provide the value in the new thread
.subscribeOn(Schedulers.newThread()); main
publisher.subscribe(i -> log(i));
log("bye bye");
sleep(1000);
// 37ms [main] bye bye
// 38ms [RxNewThreadScheduler-1] 1
Now, the Subscriber
still subscribes for the value, but the value is not there at the subscription time, because the new thread requires some time to initialize. Remember, the Publisher
only promised to deliver the value at some point, and the Subscriber
subscribed to this value in a non-blocking manner. This means when the Subscriber
waits for the value the main
thread can exit saying bye bye
. In fact I needed to add some sleep()
before it really quits and stops the program, to let the Publisher
to provide the value and the Subscriber
to consume it.
If I’d need to establish the first rule of the Reactive Programming it’d be this one: the Publisher
is a Promise of value (or multiple values) and you neither know when (or if) this value will arrive nor in which thread your Subscriber
will be executed.
With the example let’s now learn the second and the last required lesson about the Reactive Programming:
int i = 1;
i++;
i++;
log(i);
log("bye bye");
// 0ms [main] 3
// 1ms [main] bye bye
Now, we’ve done the same as previously, but we’ve also made some transformations on the original value, by adding +1
to it twice, what finally displays number 3
. In the reactive world we can do the same using a Function
:
public Integer addOne(Integer i) {
log(String.format("adding 1 to %s what makes: %d", i, i+1));
return i+1;
}
Having this Function
we can now add it to the chain of transformations of the original value provided by the Publisher
:
Flowable<Integer> publisher = Flowable.just(1)
.map(this::addOne)
.map(this::addOne);
publisher.subscribe(i -> log(i));
log("bye bye");
// 5ms [main] adding 1 to 1 what makes: 2
// 6ms [main] adding 1 to 2 what makes: 3
// 6ms [main] 3
// 7ms [main] bye bye
This is the same as in Java 8 streams, and if you feel you want to know more about dozens other possible operations can be done on Flowable
, please refer all those other RxJava tutorials you can find on first 10 pages of Google.
We can now do the same as previosly, i.e. we can move the process of providing the value to the other thread:
Flowable<Integer> publisher = Flowable.just(1)
.subscribeOn(Schedulers.newThread())
.map(this::addOne)
.map(this::addOne);
publisher.subscribe(i -> log(i));
log("bye bye");
sleep(1000);
// 2ms [main] bye bye
// 3ms [RxNewThreadScheduler-2] adding 1 to 1 what makes: 2
// 4ms [RxNewThreadScheduler-2] adding 1 to 2 what makes: 3
// 4ms [RxNewThreadScheduler-2] 3
The result is the same as previously: the Publisher
promises to deliver data, but it happens some time later, while the Subscriber
subscribes for the data in a non-blocking way, allowing the main
thread to exit.
So, everything is the same as in the first example. I repeated this whole stuff just to show this final code, and no worries - this is the last example from this chapter:
Flowable<Integer> publisher = Flowable.just(1)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(this::addOne)
.observeOn(Schedulers.newThread())
.map(this::addOne);
publisher.subscribe(i -> log(i));
log("bye bye");
sleep(1000);
// 11ms [main] bye bye
// 14ms [RxNewThreadScheduler-4] adding 1 to 1 what makes: 2
// 15ms [RxNewThreadScheduler-3] adding 1 to 2 what makes: 3
// 16ms [RxNewThreadScheduler-3] 3
As you can see, using observeOn()
we can change the thread for each operation done on our “stream”. In fact in RxJava during such a stream processing you can change threads like socks. And this is the second and the last lesson about Reactive Programming.
Thank you for reading :)
When you think about some old-fashioned blocking HTTP server using thread-per-request model, here is what the typical request-processing thread is doing all the time:
Typical request thread processing
So, our typical thread wastes for example 90% of its precious time to wait for IO, while the computation really takes only 10% of execution.
For typical thread-per-request server we can have up to a few hundreds of such threads. For example defaults for Tomcat last time I was using it was 200 threads. And this is only possible because these threads wait 90% of their time for IO, while only some time is used for real processing involving CPU (10% of 200 threads is about 20 threads really using CPU at the given moment).
But such a HTTP server can only support 200 concurrent connections, and some time ago we started to live in the world of persistent connections (like web sockets, server-sent events, etc). So what’s about 1000 connections? Or what about 10K connections? The typical thread-per-request server just can’t handle this. Firstly because creating 10K threads in Java would eat at least 20GB or RAM (a thread takes about 2MB to just initialize). Secondly, because in the above model 10% of 10K means 1000 threads actively using CPU at the moment and I don’t belive the CPU scheduler for 8-cores machine can effectively manage this.
Browsing for the answers about reactive programming I’ve also found some really interesting article: Designing a reactive HTTP server with RxJava. I recommend you to read it now from start to finish. It shows how Netty using reactive concepts can serve much more concurrent connections with only a few threads. They do it by eliminating number of threads to some sensible amount for the given machine (for example 8 threads for 8-cores machine), but each thread only takes care of executing green parts from my diagram and never waits for IO.
This sounds sensible, but who the heck handles with the red parts then and waits for this whole IO? We also need a thread to manage this. And this is the answer: the thread. A single thread can do that.
For example if you think about sockets, you can keep somewhere a collection of them and periodially review if they have a new data or already wait for the answer. If you think about a file copy operation (which is also IO), remember that you have only a few heads in the typical hard drive, and even if you copy 10 files simultaneously, this operation will have more or less the same efficiency as copying these files one by one. That obviously can be done in a one thread.
And this is a really important answer about the reactive programming for me. Somewhere, in the architecture we want to achieve (no red parts for the threads) we need to have involved a single thread which manages with IO. This thread is often called an Event Loop. Node.js has one, Netty has another one, and we need to have one as well to do the same.
If I was a Netty developer this is more or less how I’d design it:
How I’d implement Netty
Basically, I’d create a connections cache which would be periodially reviewed by a single “processor” thread and when the data is ready I’d move the processing to some of my worker threads.
But, there’s still a trap in this architecture I’m going to show in a little while. We managed supporting 10K of connections here, because we have only a few threads involved, but if our worker threads will wait for IO (read data from the database, files, some other HTTP API-s, etc) this system would work unbelieveably slow, because its threads still handle the red parts and there’s too few of them. This way we solve only the initial problem of having 10K connections: we can have them established but we can’t support all of them with our thread pool.
Let’s stop for a moment here. I remember when NIO has been introduced to Tomcat and they said they now can handle a lot more of concurrent connections. Now I see this architecture more or less as the same drawn on the picture above, but still with 200 threads. These threads are no longer request processing threads (because requests are processed the in the single-threaded event loop), but the business logic is still blocking and we need a lot of threads because we haven’t got rid of red parts yet.
So, how to get rid of the red parts from our worker threads? We can for example use the same event-loop pattern for all IO operations in the app:
The example Reactive System
On the picture above, our request processing is done in multiple random threads from the workers poll. Each time the worker needs to do some IO, it delegates it to appropriate event loop using pink arrows, which are asynchronous: after such a delegation the worker thread ends processing and is returned to the pool. When the data is ready in the event loop, it sends it back to the workers pool, and the rest of processing is done by some (possibly another) thread from the pool.
This way only the CPU-intensive processing is done in the workers pool and we can support a really lot of concurrent connections with only a few threads.
As the excercise after reading this article try to make this whole flow transactional :)
OK, after this short introduction we may now try to implement the reactive system using RxJava. Our case is a very standard web app, which needs to handle incoming requests, get some data from the database and finally returns it to the client. The rules are following:
Integer
.Integer
as well: it’s the value from the request multiplied by 2.Every time-consuming operation we will simulate with sleep()
.
Moreover, we have a very limited hardware of Nokia 3310 from late nineties to launch this app. It only support few threads and we can use only 3 threads for our workers pool.
The default blocking implementation using thread-per-request processing model looks very simple:
public static final int REQUESTS_DELAY_MS = 10; // the delay between requests
public static final int REQUEST_DATA_RECEIVE_TIME_MS = 2000; // how long do we receive data from the client
public static final int CPU_INTENSIVE_OPERATION_TIME_MS = 500; // how long the CPU-intensive operation lasts
public static final int DATABASE_OPERATION_TIME_MS = 2000; // how long do we get data from db
private static int threadNum = 0;
// our workers pool which executed the business logic
protected static ExecutorService workersPool =
Executors.newFixedThreadPool(3, r -> new Thread(r, "WORKER-" + (++threadNum)));
for (int i = 0; i< requestsCount; i++) {
log("New request connection established: " + i);
final int data = i;
workersPool.submit(() -> {
log("New request connection established: " + data);
sleep(REQUEST_DATA_RECEIVE_TIME_MS);
log("Received request: " + data);
log("Doing some CPU-intensive operations");
sleep(CPU_INTENSIVE_OPERATION_TIME_MS);
log("Asking the database for data and exiting the worker thread");
sleep(DATABASE_OPERATION_TIME_MS);
log("Database data ready for: " + data + " with data: " + data*2);
log("Response: " + data*2 + " is ready, sending it to the client");
});
sleep(REQUESTS_DELAY_MS);
}
When the data comes it’s sent to the thread from the workers pool and it does the whole processing, so with 3 worker threads in the pool we can support 3 concurrent connections in this model. A single request processing can be represented with the following sequence diagram:
Thread per request processing model
And it looks as follows in the logs:
15 [main] New request connection established: 0
33 [WORKER-1] New request connection established: 0
2033 [WORKER-1] Received request: 0
2034 [WORKER-1] Doing some CPU-intensive operations
2534 [WORKER-1] Asking the database for data and exiting the worker thread
4535 [WORKER-1] Database data ready for: 0 with data: 0
4535 [WORKER-1] Response: 0 is ready, sending it to the client
Everything happens in the worker thread and single request thread processing takes about 4,5s. This would make in total 45s for 10 requests, and because we have 3 threads in the pool the overall time required to process 10 requests should be about 15s. However, including some intertia it finally takes about 18s (look here for the complete logs).
Thinking about reactive system we need to consider two things. First is how to implement it in a publisher-subscriber model. When the request comes, we want to delegate handling it somewhere, but this handling should only start after the request data is ready. The other thing is what to do after the request processing, when we need to send back the response. Both operations can be modelled as reactive Publisher
, and because they need to work in both directions, I called them a channel:
public class Channel<REQ, RESP> {
private Flowable<REQ> request; // only required to be subscribed
private FlowableProcessor<RESP> response; // is subscribed by some response processing thread, but also accepts the data published from the application business logic
public Channel(Flowable<REQ> request) {
this.request = request;
this.response = UnicastProcessor.create();
}
public Flowable<REQ> requestChannel() {
return request;
}
public FlowableProcessor<RESP> responseChannel() {
return response;
}
}
The pipe representing the response besides being a Publisher
which can be subcribed in the HTTP thread, also needs to give an options for the worker thread to publish the response data into it. This is why it’s of FlowableProcessor
type which means it’s both a Publisher
and Subscriber
.
More interesting and the most important thing for a reactive system is to have an event loop somewhere, which is able to process our IO in a single thread. In this example it will be emulated by this class:
public class EventLoop<EVENT> extends Thread {
protected Map<Long, ConnectableFlowable<EVENT>> resourcesCache = Collections.synchronizedMap(new TreeMap<>());
public EventLoop(String name) {
super(name);
start(); // immediately starts the thread
}
@Override
public void run() {
BaseTest.log("Starting thread");
try {
while (true) {
synchronized (this) {
// What basically it does, is to check resourceCache with 100ms interval if there's some event which should be emitted
// right now. If the event is found, it emits the event by connecting to the ConnectableFlowable representing this
// event.
if (!resourcesCache.isEmpty()) {
Map.Entry<Long, ConnectableFlowable<EVENT>> next = resourcesCache.entrySet().iterator().next();
if (next.getKey() > System.currentTimeMillis())
wait(next.getKey() - System.currentTimeMillis());
resourcesCache.remove(next.getKey());
ConnectableFlowable<EVENT> emitter = next.getValue();
emitter.connect();
}
wait(100);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* @param msg The additional message that will be logged onto console on event emitting.
* @param value The event to be emitted.
* @param delay After what delay the event should be emitted.
* @return The {@link Publisher} to which the client can subscribe to get the event after the delay.
*/
public Flowable<EVENT> publisher(String msg, EVENT value, long delay) {
ConnectableFlowable<EVENT> flowable = (ConnectableFlowable) Flowable.create(emitter -> {
BaseTest.log(msg + ": " + value);
emitter.onNext(value);
}, BackpressureStrategy.ERROR)
// that's imporant, this Publisher won't start emitting events until connect(), what happens in the main thread after requested delay
.publish();
resourcesCache.put(System.currentTimeMillis()+delay, flowable);
return flowable;
}
}
In publisher()
method we can get the publisher which will emit a value
after some delay
. But, because EventLoop
is a Thread
you can check in its run()
method that all its processing happens in a single-threaded loop. We use resourcesCache
to keep our IO resources and review them periodically to trigger the value emission.
This is just emulator, but it does the same as the real event loop. The difference here is that this emulator emits the value always after previosly known delay
, while in the real implementation it would emit it when the data is ready in the socket, so after some time that is not known upfront.
Now comes the main implementation of our reactive system:
// the pool accepting HTTP connections (single thread for the server socket)
protected static ExecutorService httpAcceptorPool = Executors.newFixedThreadPool(1, r -> new Thread(r, "HTTP-ACCEPTOR"));
EventLoop<Integer> httpProcessor = new EventLoop<>("HTTP-PROCESSOR");
Flowable.create((FlowableEmitter<Channel<Integer, Integer>> emitter) -> {
for (int i = 0; i< requestsCount; i++) { // (1)
log("New request connection established: " + i);
// (3)
Channel<Integer, Integer> channel =
new Channel<>(httpProcessor.publisher("Request completed with data", i, REQUEST_DATA_RECEIVE_TIME_MS));
// (4)
channel.responseChannel()
.subscribe(it -> {
log("Response: " + it + " is ready, sending it to the client"); // (10)
});
// (5)
emitter.onNext(channel);
sleep(REQUESTS_DELAY_MS);
}
emitter.onComplete();
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.from(httpAcceptorPool)) // (2)
.subscribe(channel -> { // (6)
log("Beginning request subscription");
channel.requestChannel() // (7)
.observeOn(Schedulers.from(workersPool)) // (8)
.subscribe(it -> {
log("Received request: " + it);
log("Doing some CPU-intensive operations");
sleep(CPU_INTENSIVE_OPERATION_TIME_MS);
log("Asking the database for data and waiting for it");
sleep(DATABASE_OPERATION_TIME_MS);
log("Database data ready for: " + it + " with data: " + it*2);
Flowable.just(it*2)
.subscribe(channel.responseChannel()); // (9)
});
});
Let me explain it step-by-step. We create here a Flowable
(Publisher
) which generates some values in the loop (1), what simulated receiving requests with Integer
value in the body. We also create a single HTTP-ACCEPTOR thread which receives new connections and our generator loop works in this thread (2). When the new connection arrives (what doesn’t mean the whole request data is ready at this moment) we create a Channel
to handle this connection. The request processing is delegated to single-threaded HTTP-PROCESSOR event loop, which will generate the complete request in 2 secs (3). At the same time we subscribe for the response value, so that when it will arrive we can send it back to the client (4). Finally we emit this channel so that it can be handled by our application.
In the same HTTP-ACCEPTOR thread we subscribe to for the emitted channel (6), but in the next line we say that we really want to subcribe to the request pipe, so we only want to continue when the full request data will be ready (7). And, what’s the most important, when this data is ready we want to move the processing to our workers pool (8).
When the request data is ready, we basically do almost the same as the previous thread from thread-per-request model and finally publish the response value to the response channel (9)This is then received by the response channel subscriber and the response is sent to the client (10).
I hope this is quite understandable. In fact this code seems to be easy, while the real magic is done in the EventLoop
thread. Yet, I don’t believe you can consider a reactive system without the event loop pattern, because either you have an event loop, or a thread pool. And the thread pool is something what we want to eliminate with this architecture.
The least awkward sequence diagram about the above interactions I was able to accomplish is this one:
The reactive system
The HTTP-ACCEPTOR does only a small works and then it can wait for another server socket connection, and the whole work of waiting for the request to complete is delegated to the HTTP-PROCESSOR, which is a single-threaded loop. When the data is ready, the rest of processing is done by the worker pool thread, and finally the response is sent back to the client. With this implementation we’ve relieved the worker thread from waiting for the request to complete, what can be seen in a single request processing log:
3 [HTTP-PROCESSOR] Starting thread
47 [HTTP-ACCEPTOR] New request connection established: 0
51 [HTTP-ACCEPTOR] Beginning request subscription
2050 [HTTP-PROCESSOR] Request completed with data: 0
2053 [WORKER-1] Received request: 0
2053 [WORKER-1] Doing some CPU-intensive operations
2553 [WORKER-1] Asking the database for data and waiting for it
4554 [WORKER-1] Database data ready for: 0 with data: 0
4556 [WORKER-1] Response: 0 is ready, sending it to the client
The difference here is that it’s the HTTP-PROCESSOR event loop who is responsible for waiting for the data, and then the worker does the rest of the work. But, the overall processing is the same as for thread-per-request model: 4.5s. Is it correct? Yup, remember we work with exactly the same delays as in the first example.
But if you make a test with 10 requests you can start to see a profit, because the overall processing this time takes 12s, what makes it 33% faster than the first example. If you’re curious what is happening exactly, please check it here.
The example above is better, but still not devoid of some problems. As you can see our worker thread still supports some red parts of processing, because when it gets the data from the db, it still waits for IO. This is why the truly reactive system can’t be implemented without all IO resource drivers reactive. To get things done in the right way we need to quit using good old blocking JDBC driver we’ve used in the previous example (have you noticed that?) and move to some reactive driver, like for example reactive-pg-client, whis is done, wait for it…, using the event loop :)
The reimplementation for our example with this driver is the following:
EventLoop<Integer> httpProcessor = new EventLoop<>("HTTP-PROCESSOR");
EventLoop<Integer> dbProcessor = new EventLoop<>("DB"); // (1)
Flowable.create((FlowableEmitter<Channel<Integer, Integer>> emitter) -> {
for (int i = 0; i< requestsCount; i++) {
log("New request connection established: " + i);
Channel<Integer, Integer> channel =
new Channel<>(httpProcessor.publisher("Request completed with data", i, REQUEST_DATA_RECEIVE_TIME_MS));
channel.responseChannel()
.subscribe(it -> {
log("Response: " + it + " is ready, sending it to the client");
});
emitter.onNext(channel);
sleep(REQUESTS_DELAY_MS);
}
emitter.onComplete();
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.from(httpAcceptorPool))
.subscribe(channel -> {
log("Beginning request subscription");
channel.requestChannel()
.observeOn(Schedulers.from(workersPool))
.subscribe(it -> {
log("Received request: " + it);
log("Doing some CPU-intensive operations");
sleep(CPU_INTENSIVE_OPERATION_TIME_MS);
log("Asking the database for data and exiting the worker thread");
dbProcessor.publisher("Database data ready for: " + it + " with data", it*2, DATABASE_OPERATION_TIME_MS)
.subscribe(channel.responseChannel()); // (2)
});
});
We have a second event loop involved here, which supports all db-related IO and waits for database data (1). At the end of our worker thread processing, when it wants to get the data from the db and send it back to the client, we delegate this task to our reactive SQL driver.
And yes, I assumed the data received from the db is then directly passed to the response subscriber and used as the response body. This is because I want to keep it simple. Of course, in the real app we would rather observe the data in the same workers pool and process it before sending (for example by converting database entities to JSON).
The sequence of this processing is the following:
The fully reactive system
That’s the dreamed situation: the worker thread only does the CPU intensive operation and exits. This way we can fully utilize our small pool of threads to support a high throughput.
Of course for the single request this version takes again 4.5s because of the same delays as previously:
0 [HTTP-PROCESSOR] Starting thread
0 [DB] Starting thread
1 [HTTP-ACCEPTOR] New request connection established: 0
1 [HTTP-ACCEPTOR] Beginning request subscription
2002 [HTTP-PROCESSOR] Request completed with data: 0
2002 [WORKER-2] Received request: 0
2003 [WORKER-2] Doing some CPU-intensive operations
2503 [WORKER-2] Asking the database for data and exiting the worker thread
4504 [DB] Database data ready for: 0 with data: 0
4504 [DB] Response: 0 is ready, sending it to the client
But when we run it for 10 concurrent requests we can see it takes only 6s to complete, what is 66% faster than the original thread-per-request implementation. See the details here.
So, the question is do you need to get involved into the reactive programming and will you get benefits from this? My answer is: probably not, unless you plan to create a big Bitcoin exchange platform or something like this. But if you look at usual web apps I can’t see benefits from the reactive programming. Especially in microservices architecture where you can easily scale up using different means.
My first question is: does the reactive system really perform much better that a statistical approach?
Let’s assume for example we have a 8-cores node which can easily manage 8 threads. In a well done reactive system these threads work only on the green parts of the processing and this way they consume 100% CPU.
On the other hand we can calculate that for example in thread-per-request model our threads do nothing 90% of time, while the real CPU-intensive processing takes only 10%. This way having 80 threads we statistically have at the moment 8 threads intensively working on CPU (same as for a reactive system), while the rest 72 are waiting for IO. The average performance of such system should be similar to the reactive one, with 8 threads. The cost here is 72 additional threads, what makes about 144 MB of RAM. OK, 2MB is just for the thread initialization, so let’s be generous. Let’s give each of them 10MB for the full processing, what makes 720MB. Is this a lot of?
Let’s now think from the other perspective - what is the cost of reactive programming? The cost here is in a developer, because the system is much more complex to be written, and yet more to be understood by the others, and even yet more to be maintained (for example: forget about stacktrace analysis, standard try..catch error handling, transactional safety, etc). And what I can observe is that the developer is currently much more expensive than the iron. The iron is cheap and you can always buy more, especially if the business goes in the right direction.
I haven’t seen so far the project which couldn’t make it after hitting a very high throughput, because a very high throughput means a very good money, and the business with the money will always find the way to scale. On the other hand I’ve seen businesses having problems because of too big software complexity and lack of really qualified developers. And this is the reactive programming: a lot of additional complexity and requirement to have a really experienced people.
Of course that doesn’t make it not funny :)