How to use Reactor to complete operations like Flink

  • 2021-09-04 23:59:47
  • OfStack

Directory 1. Background
2. Implementation process
1. Create Flux and send data separation
2. Back pressure support
3. Window function
4. Consumer handling
3. Summary
1. Summarize the overall implementation process under 1
2. Comparison with Flink
4. Appendix

1. Background

Flink has great advantages when dealing with streaming tasks, among which windows and other operators can easily complete aggregation tasks, but Flink is a set of independent services. If you want to use it in business processes, you need to send data to kafka, then send it to kafka after processing it with Flink, and then do business processing, which is very cumbersome.

For example, if you want to realize the batch aggregation function of window similar to Flink in business code, if it is cumbersome to write code manually and it is too heavy to use Flink, it can be easily realized by using window and buffer operators of responsive programming RxJava and Reactor in this scenario.

Responsive programming framework has already had back pressure and rich operator support. Can we use responsive programming framework to handle operations like Flink? The answer is yes.

This article uses Reactor to implement the window function of Flink as an example, and other operators are the same in theory. Code involved in this article: github

2. Implementation process

Flink is a good encapsulation for streaming processing. When using Flink, there is almost no need to care about thread pool, backlog, data loss and other issues. However, when using Reactor to achieve similar functions, it is necessary to have a better understanding of the operation principle of Reactor and test it in different scenarios, otherwise it is easy to go wrong.

The core points in the implementation process are listed below:

1. Create Flux and send data separation

When getting started with Reactor, the examples given are to assign data at the same time when creating Flux, such as Flux. just, Flux. range, etc. After version 3.4. 0, Flux is created first, and then data can be sent using Sinks. There are two confusing methods:

Sinks. many (). multicast () If there is no subscriber, the received message is discarded directly Sinks. many (). unicast () If there is no subscriber, save the received message until the first subscriber subscribes Sinks. many (). replay () Saves all messages regardless of how many subscribers there are

In this example scenario, Sinks. many (). unicast () is selected

Official document: https://projectreactor.io/docs/core/release/reference/# processors

2. Back pressure support

The object back pressure strategy of the above method supports two kinds: BackpressureBuffer and BackpressureError. In this scenario, BackpressureBuffer must be selected, and the cache queue needs to be specified. The initialization method is as follows: Queues. get (queueSize). get ()

There are two ways to submit data:

emitNext Specifies Commit Failure Policy Synchronous Commit tryEmitNext asynchronous commit, return the status of successful and failed commit

In this scenario, we don't want to lose data. We can customize the failure policy, retry the failure indefinitely, and of course we can call asynchronous methods to retry ourselves.

 Sinks.EmitFailureHandler ALWAYS_RETRY_HANDLER = (signalType, emitResult) -> emitResult.isFailure();

After that, you can call Sinks. asFlux and happily use various operators.

After that, you can call Sinks. asFlux and happily use various operators.

3. Window function

Reactor supports two types of windowed aggregate functions:

window Class: Returns Mono (Flux) buffer class: Returns List

In this scenario, buffer can meet the requirements, bufferTimeout (int maxSize, Duration maxTime) supports maximum number and maximum waiting time operations, and keys operations in Flink can be realized by groupBy and collectMap.

4. Consumer handling

After passing through buffer, Reactor sends data one by one. If publishOn or subscribeOn is used, the new data of request will be re-sent only after the downstream subscribe is processed, and the buffer operator will re-send data. If the subscribe consumer takes a long time at this time, the data flow will block in the buffer process, which is obviously not what we want.

The ideal operation is for consumers to operate in a thread pool, which can be processed in parallel with multiple threads. If the thread pool is full, then block the buffer operator. The solution is to customize 1 thread pool, and of course, if the thread pool is full of submit to support blocking, it can be implemented by customizing RejectedExecutionHandler:


 RejectedExecutionHandler executionHandler = (r, executor) -> {
   try {
     executor.getQueue().put(r);
   } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     throw new RejectedExecutionException("Producer thread interrupted", e);
   }
 };
 
 new ThreadPoolExecutor(poolSize, poolSize,
     0L, TimeUnit.MILLISECONDS,
     new SynchronousQueue<>(),
     executionHandler);

3. Summary

1. Summarize the overall implementation process under 1

Submit task: Submit data to support synchronous and asynchronous two ways, support multi-thread submission, normally respond quickly, synchronous method if the queue is full then blocking.
Rich operators handle streaming data.
Multithreading of data generated by the buffer operator: Synchronous submission to a separate consumer thread pool, blocking when the thread pool is full.
Consumer Thread Pool: Supports blocking commits, guarantees that messages are not lost, and sets the queue length to 0 because there is already a queue in front of it.
Backpressure: After the consumer thread pool is blocked, it will be backpressed to the buffer operator and to the buffer queue, and the buffer queue will be backpressed to the data submitter.

2. Comparison with Flink

Functions of Flink implemented:

Rich operators without entering Flink Support back pressure without losing data

Advantages:

Lightweight and can be used directly in business code

Weaknesses:

The internal execution process is complex and easy to step on the pit, so it is not as fool as Flink There is no watermark function, which means that only out-of-order data processing is supported There is no savepoint functionality. Although we solved some problems with back pressure, the data in cache queue and consumer thread pool will be lost after downtime. The remedial measure is to add Java Hook functionality It only supports stand-alone, which means that your cache queue cannot be set to infinite size, and the size of thread pool should be considered, and there are no functions such as flink globalWindow It is necessary to consider the impact on the upstream data source. The upstream of Flink is generally mq, which can be automatically piled up when the amount of data is large. If the upstream of this scheme is called by http and rpc, the blocking impact cannot be ignored. The compensation scheme is to use asynchronous method every time data is submitted, and if it fails, submit it to mq for buffering and consume the mq unlimited retry.

4. Appendix

Source address of this article: https://github.com/sofn/reactor-window-like-flink

Reactor Official Document: https://projectreactor.io/docs/core/release/reference/

Flink document: https://ci.apache.org/projects/flink/flink-docs-stable/

Reactive Operator: http://reactivex.io/documentation/operators.html

The above is how to use Reactor to complete the operation similar to Flink in detail, more information about using Reactor to complete the operation similar to Flink please pay attention to other related articles on this site!


Related articles: