Reactor 数据流、并发&并行、反压

假设你已经了解了, Reactive Streams某一个实现库的基本使用。

Project Reactor是遵循Reactive Streams响应式流规范实现的超集,是一套事件驱动的、反应式、函数式异步编程库。

# 数据流的基本定义-Reactive Streams

这是一套规范,也是一套SPI定义,目前地位和权威性非常高。 正如如JDBC一样理解,这一套规范有不同的实现,目前有Project Reactor、RxJava系列、JDK 9 Flow… 他们都可以通过Reactive Streams SPI相互转化

// Project Reactor
val mono = Mono.just(1)
// Reactive Streams
val pub = mono as Publisher<Int>
// RxJava
val maybe = Maybe.fromPublisher(pub)

Reactive Streams内容非常简短,分别是

名字 描述
Publisher 发布者。无界序列数据的提供者,根据从订阅者接收到的请求发布这些元素
Subscriber 订阅者。用于接收和处理数据流事件
Subscrption 订阅。一个对象实例,表示了一对PublisherSubscriber的生命周期
Processor 处理器。表示了一个处理阶段,它既是订阅者,也是发布者,同时遵守两者的契约

完整见Reactive Streams (opens new window)

# Publisher

首先从Publisher开始,接口只有一个方法定义就是fun subscribe(sub: Subscriber),传入一个Subscriber触发数据流的开始。 文档定义了这是一个可以被多次调用的”工厂方法“,每一次调用都会新建一个Subscription生命周期绑定,每个Subscription只服务于一个Subscriber。 该接口没有默认实现,因为实现取决于Publish源的性质,如冷/热、单/多/广播、Scalar、Processer……

# Subscriber

然后是订阅者Subscriber,该接口其实是一个Callback,用于描述当事件到来的时候如何处理。 其中onSubscribe,接收一个Subscription,表示订阅被触发。 这时候Subscriber可以通过该”生命周期对象“,进行数据流的请求、订阅取消

interface Subscriber<T> {

    void onSubscribe(Subscription s);

    void onNext(T t);

    void onError(Throwable t);

    void onComplete();
}

# Subscription

正如上文提到,一个Subscription实例,绑定了一对PublisherSubscriber的生命周期。 其接口仅有如下内容,request方法用于往”上游“(稍后再解释为何用”上游“)请求指定数量的数据,cancel用于取消订阅,它们都是通过往上游发送事件的方式进行通知,这也是反应式的特点,上游能够感知下游并能够做出合理的”反应“

interface Subscription {

    void request(long n);

    void cancel();
}

# Processor

可能会感到奇怪,为何Subscription提到了”上游“而不直接使用”源“,正是因为Processor的存在。 它是一个空定义,仅仅继承了SubscriberPublisher,它一般用于描述数据流的中间处理阶段,作为Subscriber订阅了上游,同时作为Publisher成为下游的发布者。

它的实现可以是非常简单,如仅仅包装数据处理的中间函数,它也可以非常复杂,正如”源“的复杂性质,Processor更加复杂。一个例子,比如同时订阅了两个源,且分别是冷、热数据流的连接,这时候如何对”反压情况“或”中间错误“进行反应? 仅仅抛出了一个问题,你不必过于焦虑的开始思考实现,因为这一切都在Reactive Streams的实现库中为你提供了友好的处理方式。

额外,值得一提的是。你当前的Publisher也未必是真正的”数据源“,它大概率可能是订阅自网络的数据发布者,如通过数据库、Web API、RPC服务……所以上游的真实情况可以是非常复杂,但不必过于担心,因为Reactive Streams其思维方式和编程范式,能够帮助你用同一套数据流的思维,处理大部分情况的发生。

# Project Reactor Flux

Project Reactor对于Publisher定义了两个类MonoFlux,分别表示了[0:1]发布者,[0:N]发布者,更加方便了开发者开发。

Flux是一个有序的、并发的异步数据流,但是在flatMap等算子中,顺序是会变化和组合的,可能是元素+时间的顺序。

一个例子如下,首先声明了0~10元素序列的标量流,然后在flatMap阶段模拟了一个有时延的异步处理,同时设定了最大并发数为3。

    (0..10)
        .toFlux()
        .log()
		    // concurrency = 3
		    // prefetch = 2
        .flatMap({ Mono.just(it).log().delayElement(Duration.ofMillis(Random.nextLong(1000L..3000L))) }, 3, 2)
        .log()

程序运行日志如下

2021-06-23 13:03:18.746 INFO  ---  Flux.Iterable.1 : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
2021-06-23 13:03:18.749 INFO  ---  .Flux.FlatMap.2 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:03:18.749 INFO  ---  .Flux.FlatMap.2 : request(unbounded)
2021-06-23 13:03:18.750 INFO  ---  Flux.Iterable.1 : | request(3)
2021-06-23 13:03:18.750 INFO  ---  Flux.Iterable.1 : | onNext(0)
2021-06-23 13:03:18.816 INFO  ---  Flux.Iterable.1 : | onNext(1)
2021-06-23 13:03:18.816 INFO  ---  Flux.Iterable.1 : | onNext(2)
2021-06-23 13:03:20.339 INFO  ---  .Flux.FlatMap.2 : onNext(2)
2021-06-23 13:03:20.339 INFO  ---  Flux.Iterable.1 : | request(1)
2021-06-23 13:03:20.339 INFO  ---  Flux.Iterable.1 : | onNext(3)
2021-06-23 13:03:20.746 INFO  ---  .Flux.FlatMap.2 : onNext(0)
2021-06-23 13:03:20.746 INFO  ---  Flux.Iterable.1 : | request(1)
2021-06-23 13:03:20.746 INFO  ---  Flux.Iterable.1 : | onNext(4)
2021-06-23 13:03:20.941 INFO  ---  .Flux.FlatMap.2 : onNext(1)
2021-06-23 13:03:20.941 INFO  ---  Flux.Iterable.1 : | request(1)
2021-06-23 13:03:20.941 INFO  ---  Flux.Iterable.1 : | onNext(5)
2021-06-23 13:03:23.015 INFO  ---  .Flux.FlatMap.2 : onNext(3)
2021-06-23 13:03:23.015 INFO  ---  Flux.Iterable.1 : | request(1)
2021-06-23 13:03:23.015 INFO  ---  Flux.Iterable.1 : | onNext(6)
2021-06-23 13:03:23.317 INFO  ---  .Flux.FlatMap.2 : onNext(4)
2021-06-23 13:03:23.317 INFO  ---  Flux.Iterable.1 : | request(1)
2021-06-23 13:03:23.317 INFO  ---  Flux.Iterable.1 : | onNext(7)
2021-06-23 13:03:23.668 INFO  ---  .Flux.FlatMap.2 : onNext(5)
2021-06-23 13:03:23.668 INFO  ---  Flux.Iterable.1 : | request(1)
2021-06-23 13:03:23.668 INFO  ---  Flux.Iterable.1 : | onNext(8)
2021-06-23 13:03:24.701 INFO  ---  .Flux.FlatMap.2 : onNext(6)
2021-06-23 13:03:24.701 INFO  ---  Flux.Iterable.1 : | request(1)
2021-06-23 13:03:24.701 INFO  ---  Flux.Iterable.1 : | onNext(9)
2021-06-23 13:03:24.854 INFO  ---  .Flux.FlatMap.2 : onNext(7)
2021-06-23 13:03:24.854 INFO  ---  Flux.Iterable.1 : | request(1)
2021-06-23 13:03:24.855 INFO  ---  Flux.Iterable.1 : | onNext(10)
2021-06-23 13:03:24.855 INFO  ---  Flux.Iterable.1 : | onComplete()
2021-06-23 13:03:24.994 INFO  ---  .Flux.FlatMap.2 : onNext(8)
2021-06-23 13:03:26.733 INFO  ---  .Flux.FlatMap.2 : onNext(10)
2021-06-23 13:03:27.106 INFO  ---  .Flux.FlatMap.2 : onNext(9)
2021-06-23 13:03:27.106 INFO  ---  .Flux.FlatMap.2 : onComplete()

# 并发流

从下到上分析

  1. 可以看到程序触发订阅后,Subscriber开始往上游发送request事件
  2. flatMap收到了来自Subscriberrequest(unounded)事件,意味着上游可以尽可能快的发送无尽数据
  3. Flux.Iterable即真实的source,收到了来自flatMaprequest(3)事件,意味着这一阶段,只可以发送最大3个元素。注意:在微观上,并没有限制时间和速度
  4. 虽然设置了prefetch = 2,即可以在没收到来自下游的request的情况下,提前预取的最大元素数量。不过由于当前flatMap节点的下游直接request(unbouned)所以该参数没有用。
  5. 设置了最大并发数concurrency = 3,但是考虑到消费速度(由并发、时延、队列长度…决定),后续flatMap没有按照设置的最大并发消费,而且减缓了对上游的请求速度,每轮次request(1)

P.S. 在算子flatMap的实现中,并发是由一个队列控制的。concurrency == queue.size,所以一旦Queue元素被消费,随即开始request(1~3)个元素补充队列。在其他算子和自定义实现中,不一定需要按照这种策略流控进行实现。

# 并发顺序

为了获得更详细信息,对Mono.delay部分加上详细日志

2021-06-23 13:21:21.061 INFO  ---  Flux.Iterable.1 : | request(3)
2021-06-23 13:21:21.062 INFO  ---  Flux.Iterable.1 : | onNext(0)
2021-06-23 13:21:21.142 INFO  ---  tor.Mono.Just.3 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-06-23 13:21:21.142 INFO  ---  tor.Mono.Just.3 : | request(unbounded)
2021-06-23 13:21:21.142 INFO  ---  tor.Mono.Just.3 : | onNext(0)
2021-06-23 13:21:21.146 INFO  ---  tor.Mono.Just.3 : | onComplete()
2021-06-23 13:21:21.146 INFO  ---  Flux.Iterable.1 : | onNext(1)
2021-06-23 13:21:21.147 INFO  ---  tor.Mono.Just.4 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-06-23 13:21:21.147 INFO  ---  tor.Mono.Just.4 : | request(unbounded)
2021-06-23 13:21:21.147 INFO  ---  tor.Mono.Just.4 : | onNext(1)
2021-06-23 13:21:21.147 INFO  ---  tor.Mono.Just.4 : | onComplete()
2021-06-23 13:21:21.147 INFO  ---  Flux.Iterable.1 : | onNext(2)
2021-06-23 13:21:21.147 INFO  ---  tor.Mono.Just.5 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-06-23 13:21:21.147 INFO  ---  tor.Mono.Just.5 : | request(unbounded)
2021-06-23 13:21:21.147 INFO  ---  tor.Mono.Just.5 : | onNext(2)
2021-06-23 13:21:21.148 INFO  ---  tor.Mono.Just.5 : | onComplete()
...

从日志可以看到,flatMap算子订阅上游source的时候,source是有序发送消息的,同时Mono.Just也是按顺序触发订阅。 由于delayElement部分(模拟时延远程服务),导致Mono.just.delay返回的时间有差异,造成了flatMap后顺序与上游顺序不一致。 虽然有时候,这会让流的处理看起来不是很直观,但是从逻辑和结果上,它都是正确的,且大部分时候你不需要维持这一个顺序。

P.S.这时候如果若要保持下游与上游顺序一致,需要使用flatMapSequential算子

# ParallelFlux

有时候

  • 你需要真正的并行计算
  • 你不需要关心顺序
  • 你是典型的高CPU算力,想要充分压榨多核CPU资源

ParallelFlux会是你的极佳选择,开启一条parallelFlux非常简单。假设从一个上游Flux开始展开并行计算

(0..10)
        .toFlux()
        .parallel()
        .runOn(Schedulers.parallel())
        .flatMap { Mono.just(it).log().delayElement(Duration.ofMillis(Random.nextLong(1000L..3000L))) }
		    // 更多时候,你需要把并行流的结果,重新合并回一条流进行处理,不过这不是必须的。
    		.sequential()

将会产生如下效果 undefined

这时候详细日志如下

2021-06-23 13:40:32.400 INFO  ---  allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.405 INFO  ---  allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.410 INFO  ---  allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.410 INFO  ---  allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.410 INFO  ---  allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.410 INFO  ---  allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.410 INFO  ---  allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.411 INFO  ---  allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.411 INFO  ---  allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.411 INFO  ---  allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.411 INFO  ---  allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.411 INFO  ---  allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.411 INFO  ---  allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.412 INFO  ---  allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.412 INFO  ---  allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.412 INFO  ---  allel.FlatMap.1 : request(256)

Schedulers.parallel()会获得一个根据CPU物理核数和线程数计算出的最佳线程调度池,然后把并行流运行在该池子上进行调度。在当前机器配置中池子数量为8。 8个并行流同时订阅上游,并触发request(256)事件,设置了最大发射数量。 256这个值是由下游sequential算子,默认设置的最大预取值prefetch,当然这个数值是会动态变化的,原理可以参考flatMap算子的Queue算法。

通过ParallelFlux你在大部分情况里,可以完全不关心底层,不关心你如何计算你的算力,和算子任务的资源分配。Reactive Streams的定义,以及Project Reactor的底层实现,已经很好的处理了这些事情,从而帮助你更加专注于你的业务逻辑声明。

# 反压

看到这里,有了上面日志的直观体感,我想终于可以对反压进行开讲了。

首先明确这个定义,反压是一种”情况“,描述的是数据流的下游消费速度,慢于上游的生产速度。 而"反压策略",则是对”反压“进行避免、减缓、保护的策略行动。

防止反压的出现,达到一个最佳的数据发射和消费速度,对已经分配的算力进行资源利用的最大化。获得这个准确速度是过于理想的,不过能够通过Project Reactor构建出一个相对合理的数据流。 大致原理就是让上游对下游”反应“(动词),动态控制数据的生产速度,这里从简单到复杂列举几个情况。

# Pull、Generator式数据源

这是一种最简单的Cold流,source会根据下游的订阅和请求情况产生数据(标量数据、数据库游标、生成器……),于是下游可以通过Subscription.request方法,让上游动态生产数据,是最简单的反应式情况。

# Hot数据源

Hot数据源的数据是不跟随订阅而产生的。 一个例子:"用户点击事件"数据流,不管该流有没有被订阅,用户点击一次就是一次事件的发射。那么如果N个用户以极快的速度产生点击事件,发射速度远远大于开发者写的程序的处理速度,此时需要对算力进行保护。

面对这种情况,尽管控制不到源头,但是可以对可触达的上游,选择如下几种策略

  1. 丢弃、采样、拒绝服务 其实就是丢弃,只是不同的业务场景不同解读而已。对数据源进行匹配消费速度的丢弃速度,保证产出一条相对合理的数据子流
  2. 缓冲 或许,这个发射速度不是持久的,只是一段短短的时刻。缓冲策略可以让数据源在”缓冲算子“中暂存,对发射速度削峰,让”订阅者“慢慢跟上消费。注意:一旦超出了缓冲能力,依然要回落到其余的"悲观措施"
  3. 错误 有时候,过快的发射速度是人为的失误,这时候抛出异常也是一个不错的选择。开发者随后可以根据错误,进行终结数据流、通知、其他Fallback措施……在流中抛出异常,默认行为是数据流的终结。

# 反应式通信

Rsocket是一种反应式通信协议,它允许你将Reactive Streams的事件,包括订阅、取消、请求、终结等,连接到网络层传递到网络的上下游节点,从而做到分布式的反应式数据流。

不过,分布式的上游依然可能是各种结合的复杂情况,Cold/Hot,Transaction……你一样需要分析具体场景采取最合适的策略。

详细见RSocket (opens new window),是一个支持单连接多路复用的通信协议,能够在单连接中开启多条Stream