流式计算依据形态分为:
- 数据界限: 有界流和无界流(bounded/unbounded) , 数据有无边界
- 数据源产生: 冷流和热流(cold/hot), 冷: 订阅触发才"生产源", 热: 源一直存在
- 订阅方式: 缓存, 共享, 回放, 单播, 多播, 广播...
- ...
# 批处理原语的局限
MapReduce借鉴了函数式编程的思想, 可以讲是分布式批处理的函数式实现.
但是map / reduce
的批处理模型只能对于有界流才能应用, 而对于无界流, 所有reduce
原语都将失效, 如reduce
, distinct
, fold/collect
, count
, sum
, average
那么, 无界事件流reduce到底如何正确的被处理? 也是一个角度的流/批一致? 核心就是Retraction, 即撤回机制
# 实时计算场景例子
场景: 城市人口统计中, 人口地址转移, 即 count(`personId`) group by `cityId`
有界流, MapReduce, ReactiveX, Stream API...:
直接
distinct
源头事件流, 拉取状态的终点无界流中Flink的解法:
Retraction distinct, 借助状态寄存和回撤机制, 将更新事件, 转变为"撤回+插入"这两条AppendOn事件向后传递到
Operator
, 最后再区分Update/Retraction的不同类型的Sink
最终达到一致性的函数式原语效果: (目前DataStream API做不到)
eventStream({ personId, cityId, eventTime })
// Stream SQL 表示为 lastvalue(persenId, eventTime desc)
.dictinctBy(key = personId, selector = eventTime::max)
// Stream SQL 表示为 select count(persenId) group by cityId
.groupBy(key = cityId, downstream = Stream::count)
# Retraction简单原理
基于有界的事件流reduce
的基础上, 再引入状态寄存keyState
, 从而达到撤回&更新的效果, 做到了流/批一致!
[a,b,c,a,d..] == [[[[a,b,c] + 2a] + d]...]
# 如何知道要向哪些groupByOperator
的keyState
发送撤回消息?
这回利用了再上一层的keyState
即personId
, 该state
通过lastValue
函数, 维持了当前cityId
的记录, 一旦新的事件到达, 将发生指定cityId
的撤回事件.
最终"实时计算场景例子"中, data Stream所对应的, 伪Stream SQL
应该表达为
-- 城市人口事件源 `event_source`
create table `event_source` (
`person_id` VARCHAR,
`city_id` VARCHAR,
`event_time` TIMESTAMP
)
-- 实时可撤回流表 `retractable_event_source`
(
select
`person_id`, lastValue(`city_id`, `event_time`)
from `event_source`
group by `person_id`
)
as `retractable_event_source`;
-- 实时city count视图 `dynamic_grouping_city_count`
(
select
count(`person_id`)
from `retractable_event_source`)
group by `city_id`
)
as `dynamic_grouping_city_count`;
# Parallel Aggregate/Reduce
根据Data Stream API, 目前提供的信息, keyState
应该是单个网络节点的本地储存.
即是话,所有key都会汇聚到一个网络节点处理, 所以我简单的认为, Flink是一个基于顺序事件流, 背后是一个弱化版本的并行计算Stream(同Key串行, 不同Key并行).
为什么这么说? 可以先看一下熟悉的JDK Stream API
// 简单版
reduce(accumulator = (count, value) -> count + val);
// 幺元版
reduce(
identity = 0L,
accumulator = (count, value) -> count + val
);
// 幺元 + 并行版
reduce(
identity = 0L,
accumulator = (count, value) -> count + val,
combiner = (countL, countR) -> countL + countR
);
即count(`personId`) group by = `cityId`
这一句SQL应该也可以将同一个Key 分开不同的计算节点接受并行流累加, 再合并
Stream
.of({personId, cityId, eventTime})
.collect(Collectors.groupingBy(
keySelector = { it.cityId },
downstream = Collectors.counting()
))
所以理论上, 并行化的 reduce
会存在多个并行的中间State, 这时候, 就无法通过runTimeContext.keyState
获得状态了.
除非如同volatile
一样, 能够在网络中对中间State的可见性进行状态对齐...不过这样做, 更加会得不偿失, 所以就没有必要进行State的并行化了.
在Partial-Final优化 (opens new window)优化中, 看到对于热点问题, 一些开发者以及Blink官方都整合了如上的热点KeyBy之前打散并行化再汇聚的方案, 与这一个并行化Reduce也有异曲同工之妙