Flink Distinct & Parallel Aggregate

流式计算依据形态分为:

  • 数据界限: 有界流和无界流(bounded/unbounded) , 数据有无边界
  • 数据源产生: 冷流和热流(cold/hot), 冷: 订阅触发才"生产源", 热: 源一直存在
  • 订阅方式: 缓存, 共享, 回放, 单播, 多播, 广播...
  • ...

# 批处理原语的局限

MapReduce借鉴了函数式编程的思想, 可以讲是分布式批处理的函数式实现.

但是map / reduce 的批处理模型只能对于有界流才能应用, 而对于无界流, 所有reduce原语都将失效, 如reduce, distinct, fold/collect, count, sum, average

那么, 无界事件流reduce到底如何正确的被处理? 也是一个角度的流/批一致? 核心就是Retraction, 即撤回机制

# 实时计算场景例子

场景: 城市人口统计中, 人口地址转移, 即 count(`personId`) group by `cityId`

  • 有界流, MapReduce, ReactiveX, Stream API...:

    直接distinct源头事件流, 拉取状态的终点

  • 无界流中Flink的解法:

    Retraction distinct, 借助状态寄存和回撤机制, 将更新事件, 转变为"撤回+插入"这两条AppendOn事件向后传递到Operator, 最后再区分Update/Retraction的不同类型的Sink

最终达到一致性的函数式原语效果: (目前DataStream API做不到)

eventStream({ personId, cityId, eventTime }) 
    // Stream SQL 表示为 lastvalue(persenId, eventTime desc)
    .dictinctBy(key = personId, selector = eventTime::max)
    // Stream SQL 表示为 select count(persenId) group by cityId
    .groupBy(key = cityId, downstream = Stream::count)

# Retraction简单原理

基于有界的事件流reduce的基础上, 再引入状态寄存keyState, 从而达到撤回&更新的效果, 做到了流/批一致!

[a,b,c,a,d..] == [[[[a,b,c] + 2a] + d]...]

# 如何知道要向哪些groupByOperatorkeyState发送撤回消息?

这回利用了再上一层的keyStatepersonId, 该state 通过lastValue函数, 维持了当前cityId的记录, 一旦新的事件到达, 将发生指定cityId的撤回事件.

最终"实时计算场景例子"中, data Stream所对应的, 伪Stream SQL应该表达为

-- 城市人口事件源 `event_source`
create table `event_source` (
    `person_id`			VARCHAR,
    `city_id`				VARCHAR,
    `event_time`		TIMESTAMP
)

-- 实时可撤回流表 `retractable_event_source`
(
    select
        `person_id`, lastValue(`city_id`, `event_time`)
    from `event_source`
    group by `person_id`
)
as `retractable_event_source`;

-- 实时city count视图 `dynamic_grouping_city_count`
(
    select
        count(`person_id`)
    from `retractable_event_source`)
    group by `city_id`
)
as `dynamic_grouping_city_count`;

# Parallel Aggregate/Reduce

根据Data Stream API, 目前提供的信息, keyState应该是单个网络节点的本地储存.

即是话,所有key都会汇聚到一个网络节点处理, 所以我简单的认为, Flink是一个基于顺序事件流, 背后是一个弱化版本的并行计算Stream(同Key串行, 不同Key并行).

为什么这么说? 可以先看一下熟悉的JDK Stream API

// 简单版
reduce(accumulator = (count, value) -> count + val);

// 幺元版
reduce(
    identity = 0L,
    accumulator = (count, value) -> count + val
);

// 幺元 + 并行版
reduce(
    identity = 0L,
    accumulator = (count, value) -> count + val,
    combiner = (countL, countR) -> countL + countR
);

count(`personId`) group by = `cityId`这一句SQL应该也可以将同一个Key 分开不同的计算节点接受并行流累加, 再合并

Stream
    .of({personId, cityId, eventTime})
    .collect(Collectors.groupingBy(
        keySelector = { it.cityId },
        downstream = Collectors.counting()
    ))

所以理论上, 并行化的 reduce会存在多个并行的中间State, 这时候, 就无法通过runTimeContext.keyState获得状态了.

见: Flink"作弊检测"案例

除非如同volatile一样, 能够在网络中对中间State的可见性进行状态对齐...不过这样做, 更加会得不偿失, 所以就没有必要进行State的并行化了.

Partial-Final优化 (opens new window)优化中, 看到对于热点问题, 一些开发者以及Blink官方都整合了如上的热点KeyBy之前打散并行化再汇聚的方案, 与这一个并行化Reduce也有异曲同工之妙