假设你已经了解了, Reactive Streams某一个实现库的基本使用。
Project Reactor是遵循Reactive Streams响应式流规范实现的超集,是一套事件驱动的、反应式、函数式异步编程库。
# 数据流的基本定义-Reactive Streams
这是一套规范,也是一套SPI定义,目前地位和权威性非常高。 正如如JDBC一样理解,这一套规范有不同的实现,目前有Project Reactor、RxJava系列、JDK 9 Flow… 他们都可以通过Reactive Streams SPI相互转化
// Project Reactor
val mono = Mono.just(1)
// Reactive Streams
val pub = mono as Publisher<Int>
// RxJava
val maybe = Maybe.fromPublisher(pub)
Reactive Streams内容非常简短,分别是
名字 | 描述 |
---|---|
Publisher | 发布者。无界的序列数据的提供者,根据从订阅者接收到的请求发布这些元素 |
Subscriber | 订阅者。用于接收和处理数据流事件 |
Subscrption | 订阅。一个对象实例,表示了一对Publisher 和Subscriber 的生命周期 |
Processor | 处理器。表示了一个处理阶段,它既是订阅者,也是发布者,同时遵守两者的契约 |
完整见Reactive Streams (opens new window)
# Publisher
首先从Publisher
开始,接口只有一个方法定义就是fun subscribe(sub: Subscriber)
,传入一个Subscriber
触发数据流的开始。
文档定义了这是一个可以被多次调用的”工厂方法“,每一次调用都会新建一个Subscription
生命周期绑定,每个Subscription
只服务于一个Subscriber
。
该接口没有默认实现,因为实现取决于Publish源的性质,如冷/热、单/多/广播、Scalar、Processer……
# Subscriber
然后是订阅者Subscriber
,该接口其实是一个Callback
,用于描述当事件到来的时候如何处理。
其中onSubscribe
,接收一个Subscription
,表示订阅被触发。
这时候Subscriber
可以通过该”生命周期对象“,进行数据流的请求、订阅取消
interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
# Subscription
正如上文提到,一个Subscription
实例,绑定了一对Publisher
与Subscriber
的生命周期。
其接口仅有如下内容,request
方法用于往”上游“(稍后再解释为何用”上游“)请求指定数量的数据,cancel
用于取消订阅,它们都是通过往上游发送事件的方式进行通知,这也是反应式的特点,上游能够感知下游并能够做出合理的”反应“
interface Subscription {
void request(long n);
void cancel();
}
# Processor
可能会感到奇怪,为何Subscription
提到了”上游“而不直接使用”源“,正是因为Processor
的存在。
它是一个空定义,仅仅继承了Subscriber
、Publisher
,它一般用于描述数据流的中间处理阶段,作为Subscriber
订阅了上游,同时作为Publisher
成为下游的发布者。
它的实现可以是非常简单,如仅仅包装数据处理的中间函数,它也可以非常复杂,正如”源“的复杂性质,Processor
更加复杂。一个例子,比如同时订阅了两个源,且分别是冷、热数据流的连接,这时候如何对”反压情况“或”中间错误“进行反应?
仅仅抛出了一个问题,你不必过于焦虑的开始思考实现,因为这一切都在Reactive Streams的实现库中为你提供了友好的处理方式。
额外,值得一提的是。你当前的Publisher
也未必是真正的”数据源“,它大概率可能是订阅自网络的数据发布者,如通过数据库、Web API、RPC服务……所以上游的真实情况可以是非常复杂,但不必过于担心,因为Reactive Streams其思维方式和编程范式,能够帮助你用同一套数据流的思维,处理大部分情况的发生。
# Project Reactor Flux
Project Reactor对于Publisher
定义了两个类Mono
、Flux
,分别表示了[0:1]
发布者,[0:N]
发布者,更加方便了开发者开发。
Flux
是一个有序的、并发的异步数据流,但是在flatMap
等算子中,顺序是会变化和组合的,可能是元素+时间的顺序。
一个例子如下,首先声明了0~10元素序列的标量流,然后在flatMap
阶段模拟了一个有时延的异步处理,同时设定了最大并发数为3。
(0..10)
.toFlux()
.log()
// concurrency = 3
// prefetch = 2
.flatMap({ Mono.just(it).log().delayElement(Duration.ofMillis(Random.nextLong(1000L..3000L))) }, 3, 2)
.log()
程序运行日志如下
2021-06-23 13:03:18.746 INFO --- Flux.Iterable.1 : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
2021-06-23 13:03:18.749 INFO --- .Flux.FlatMap.2 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:03:18.749 INFO --- .Flux.FlatMap.2 : request(unbounded)
2021-06-23 13:03:18.750 INFO --- Flux.Iterable.1 : | request(3)
2021-06-23 13:03:18.750 INFO --- Flux.Iterable.1 : | onNext(0)
2021-06-23 13:03:18.816 INFO --- Flux.Iterable.1 : | onNext(1)
2021-06-23 13:03:18.816 INFO --- Flux.Iterable.1 : | onNext(2)
2021-06-23 13:03:20.339 INFO --- .Flux.FlatMap.2 : onNext(2)
2021-06-23 13:03:20.339 INFO --- Flux.Iterable.1 : | request(1)
2021-06-23 13:03:20.339 INFO --- Flux.Iterable.1 : | onNext(3)
2021-06-23 13:03:20.746 INFO --- .Flux.FlatMap.2 : onNext(0)
2021-06-23 13:03:20.746 INFO --- Flux.Iterable.1 : | request(1)
2021-06-23 13:03:20.746 INFO --- Flux.Iterable.1 : | onNext(4)
2021-06-23 13:03:20.941 INFO --- .Flux.FlatMap.2 : onNext(1)
2021-06-23 13:03:20.941 INFO --- Flux.Iterable.1 : | request(1)
2021-06-23 13:03:20.941 INFO --- Flux.Iterable.1 : | onNext(5)
2021-06-23 13:03:23.015 INFO --- .Flux.FlatMap.2 : onNext(3)
2021-06-23 13:03:23.015 INFO --- Flux.Iterable.1 : | request(1)
2021-06-23 13:03:23.015 INFO --- Flux.Iterable.1 : | onNext(6)
2021-06-23 13:03:23.317 INFO --- .Flux.FlatMap.2 : onNext(4)
2021-06-23 13:03:23.317 INFO --- Flux.Iterable.1 : | request(1)
2021-06-23 13:03:23.317 INFO --- Flux.Iterable.1 : | onNext(7)
2021-06-23 13:03:23.668 INFO --- .Flux.FlatMap.2 : onNext(5)
2021-06-23 13:03:23.668 INFO --- Flux.Iterable.1 : | request(1)
2021-06-23 13:03:23.668 INFO --- Flux.Iterable.1 : | onNext(8)
2021-06-23 13:03:24.701 INFO --- .Flux.FlatMap.2 : onNext(6)
2021-06-23 13:03:24.701 INFO --- Flux.Iterable.1 : | request(1)
2021-06-23 13:03:24.701 INFO --- Flux.Iterable.1 : | onNext(9)
2021-06-23 13:03:24.854 INFO --- .Flux.FlatMap.2 : onNext(7)
2021-06-23 13:03:24.854 INFO --- Flux.Iterable.1 : | request(1)
2021-06-23 13:03:24.855 INFO --- Flux.Iterable.1 : | onNext(10)
2021-06-23 13:03:24.855 INFO --- Flux.Iterable.1 : | onComplete()
2021-06-23 13:03:24.994 INFO --- .Flux.FlatMap.2 : onNext(8)
2021-06-23 13:03:26.733 INFO --- .Flux.FlatMap.2 : onNext(10)
2021-06-23 13:03:27.106 INFO --- .Flux.FlatMap.2 : onNext(9)
2021-06-23 13:03:27.106 INFO --- .Flux.FlatMap.2 : onComplete()
# 并发流
从下到上分析
- 可以看到程序触发订阅后,
Subscriber
开始往上游发送request
事件 flatMap
收到了来自Subscriber
的request(unounded)
事件,意味着上游可以尽可能快的发送无尽数据Flux.Iterable
即真实的source
,收到了来自flatMap
的request(3)
事件,意味着这一阶段,只可以发送最大3个元素。注意:在微观上,并没有限制时间和速度- 虽然设置了
prefetch = 2
,即可以在没收到来自下游的request
的情况下,提前预取的最大元素数量。不过由于当前flatMap
节点的下游直接request(unbouned)
所以该参数没有用。 - 设置了最大并发数
concurrency = 3
,但是考虑到消费速度(由并发、时延、队列长度…决定),后续flatMap
没有按照设置的最大并发消费,而且减缓了对上游的请求速度,每轮次request(1)
。
P.S. 在算子flatMap
的实现中,并发是由一个队列控制的。concurrency == queue.size
,所以一旦Queue元素被消费,随即开始request(1~3)
个元素补充队列。在其他算子和自定义实现中,不一定需要按照这种策略流控进行实现。
# 并发顺序
为了获得更详细信息,对Mono.delay
部分加上详细日志
2021-06-23 13:21:21.061 INFO --- Flux.Iterable.1 : | request(3)
2021-06-23 13:21:21.062 INFO --- Flux.Iterable.1 : | onNext(0)
2021-06-23 13:21:21.142 INFO --- tor.Mono.Just.3 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-06-23 13:21:21.142 INFO --- tor.Mono.Just.3 : | request(unbounded)
2021-06-23 13:21:21.142 INFO --- tor.Mono.Just.3 : | onNext(0)
2021-06-23 13:21:21.146 INFO --- tor.Mono.Just.3 : | onComplete()
2021-06-23 13:21:21.146 INFO --- Flux.Iterable.1 : | onNext(1)
2021-06-23 13:21:21.147 INFO --- tor.Mono.Just.4 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-06-23 13:21:21.147 INFO --- tor.Mono.Just.4 : | request(unbounded)
2021-06-23 13:21:21.147 INFO --- tor.Mono.Just.4 : | onNext(1)
2021-06-23 13:21:21.147 INFO --- tor.Mono.Just.4 : | onComplete()
2021-06-23 13:21:21.147 INFO --- Flux.Iterable.1 : | onNext(2)
2021-06-23 13:21:21.147 INFO --- tor.Mono.Just.5 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
2021-06-23 13:21:21.147 INFO --- tor.Mono.Just.5 : | request(unbounded)
2021-06-23 13:21:21.147 INFO --- tor.Mono.Just.5 : | onNext(2)
2021-06-23 13:21:21.148 INFO --- tor.Mono.Just.5 : | onComplete()
...
从日志可以看到,flatMap
算子订阅上游source
的时候,source
是有序发送消息的,同时Mono.Just
也是按顺序触发订阅。
由于delayElement
部分(模拟时延远程服务),导致Mono.just.delay
返回的时间有差异,造成了flatMap
后顺序与上游顺序不一致。
虽然有时候,这会让流的处理看起来不是很直观,但是从逻辑和结果上,它都是正确的,且大部分时候你不需要维持这一个顺序。
P.S.这时候如果若要保持下游与上游顺序一致,需要使用flatMapSequential
算子
# ParallelFlux
有时候
- 你需要真正的并行计算
- 你不需要关心顺序
- 你是典型的高CPU算力,想要充分压榨多核CPU资源
ParallelFlux
会是你的极佳选择,开启一条parallelFlux
非常简单。假设从一个上游Flux
开始展开并行计算
(0..10)
.toFlux()
.parallel()
.runOn(Schedulers.parallel())
.flatMap { Mono.just(it).log().delayElement(Duration.ofMillis(Random.nextLong(1000L..3000L))) }
// 更多时候,你需要把并行流的结果,重新合并回一条流进行处理,不过这不是必须的。
.sequential()
将会产生如下效果
这时候详细日志如下
2021-06-23 13:40:32.400 INFO --- allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.405 INFO --- allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.410 INFO --- allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.410 INFO --- allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.410 INFO --- allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.410 INFO --- allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.410 INFO --- allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.411 INFO --- allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.411 INFO --- allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.411 INFO --- allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.411 INFO --- allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.411 INFO --- allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.411 INFO --- allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.412 INFO --- allel.FlatMap.1 : request(256)
2021-06-23 13:40:32.412 INFO --- allel.FlatMap.1 : onSubscribe(FluxFlatMap.FlatMapMain)
2021-06-23 13:40:32.412 INFO --- allel.FlatMap.1 : request(256)
Schedulers.parallel()
会获得一个根据CPU物理核数和线程数计算出的最佳线程调度池,然后把并行流运行在该池子上进行调度。在当前机器配置中池子数量为8。
8个并行流同时订阅上游,并触发request(256)
事件,设置了最大发射数量。
256这个值是由下游sequential
算子,默认设置的最大预取值prefetch
,当然这个数值是会动态变化的,原理可以参考flatMap
算子的Queue算法。
通过ParallelFlux
你在大部分情况里,可以完全不关心底层,不关心你如何计算你的算力,和算子任务的资源分配。Reactive Streams的定义,以及Project Reactor的底层实现,已经很好的处理了这些事情,从而帮助你更加专注于你的业务逻辑声明。
# 反压
看到这里,有了上面日志的直观体感,我想终于可以对反压进行开讲了。
首先明确这个定义,反压是一种”情况“,描述的是数据流的下游消费速度,慢于上游的生产速度。 而"反压策略",则是对”反压“进行避免、减缓、保护的策略行动。
防止反压的出现,达到一个最佳的数据发射和消费速度,对已经分配的算力进行资源利用的最大化。获得这个准确速度是过于理想的,不过能够通过Project Reactor构建出一个相对合理的数据流。 大致原理就是让上游对下游”反应“(动词),动态控制数据的生产速度,这里从简单到复杂列举几个情况。
# Pull、Generator式数据源
这是一种最简单的Cold流,source
会根据下游的订阅和请求情况产生数据(标量数据、数据库游标、生成器……),于是下游可以通过Subscription.request
方法,让上游动态生产数据,是最简单的反应式情况。
# Hot数据源
Hot数据源的数据是不跟随订阅而产生的。 一个例子:"用户点击事件"数据流,不管该流有没有被订阅,用户点击一次就是一次事件的发射。那么如果N个用户以极快的速度产生点击事件,发射速度远远大于开发者写的程序的处理速度,此时需要对算力进行保护。
面对这种情况,尽管控制不到源头,但是可以对可触达的上游,选择如下几种策略
- 丢弃、采样、拒绝服务 其实就是丢弃,只是不同的业务场景不同解读而已。对数据源进行匹配消费速度的丢弃速度,保证产出一条相对合理的数据子流
- 缓冲 或许,这个发射速度不是持久的,只是一段短短的时刻。缓冲策略可以让数据源在”缓冲算子“中暂存,对发射速度削峰,让”订阅者“慢慢跟上消费。注意:一旦超出了缓冲能力,依然要回落到其余的"悲观措施"
- 错误 有时候,过快的发射速度是人为的失误,这时候抛出异常也是一个不错的选择。开发者随后可以根据错误,进行终结数据流、通知、其他Fallback措施……在流中抛出异常,默认行为是数据流的终结。
# 反应式通信
Rsocket是一种反应式通信协议,它允许你将Reactive Streams的事件,包括订阅、取消、请求、终结等,连接到网络层传递到网络的上下游节点,从而做到分布式的反应式数据流。
不过,分布式的上游依然可能是各种结合的复杂情况,Cold/Hot,Transaction……你一样需要分析具体场景采取最合适的策略。
详细见RSocket (opens new window),是一个支持单连接多路复用的通信协议,能够在单连接中开启多条Stream
。