实时大数据流式计算入门路径

从应用开发者(即使用者)的角度, 从论文来理解思路和模型入门, 或者从API入门都好, 都不要从原理入门. 因为从原理入门, 一开始就会陷入各种实现, 以及解决各种有解又无解的分布式问题里面. 比如

  • 存活&安全的冲突, 底层就是共识, 一致性等等的分布式基本问题
  • 还有内存, 网络, 算力...带来的一系列资源和性能问题 各个框架的实现原理大框架都比较一致, 但是详细的实现差别很大, 而且寿命很短(spark storm升级几代实现了), 是极其容易被抛弃的, 不如把这个问题抛给产品的人员, 调优也交给他(给钱了的)

(分布式问题有趣的地方是问题很简单, 解法很粗暴, 一解就出新问题, 无限套娃. 于是论文里各种理想假定和最终人工介入来打破循环. ---来自知乎大神"阿莱克西斯")

但是其实, 我们只要了解核心The DataFlow Model论文的一部分精髓就能够入门流计算了. 在设计流处理程序的时候, 所有问题都可以抽象成WWWH四个问题

  • What: 需要计算的结果数据是什么
  • Where: 计算的上下文环境是什么
  • When: 什么时候计算输出结果
  • How: 如何修正早期计算结果

DataFlow引申出windowing model, triggering model和incremental processing model解决问题. 注意, 它们不是实现! 于是, Goole Beam, Apache Flink, Spark Structuring Streaming...则是"可以"作为它们的实现.

在DataFlow模型中:

  1. 通过非聚合(map, flatmap, ParDo...)与聚合(GroupByKey, keyBy, aggregate...)原语的组合, 解决了What的问题. 把现实问题用原语进行抽象和组合.

  2. 通过group & window, 解决无界数据流中有界聚合的问题, 同时解决了你对于事件处理的上下文, 即Where的问题. 如统计每日单个用户交易金额...

  3. 通过Trigger & WaterMark, 触发器决定了什么时候一个窗口被计算和输出为窗格(稳定的计算结果),即When的问题. 如WaterMark解决的是eventTime与ProcessTime不对应的情况, 网络抖动和延迟, 上游buffer后一次性发送, 又或者源头人工批录入...

  4. 然后, 就需要解决When之后出现的意外情况(乱序...), 即所谓的incremental processing model需要解决的How. 论文定义了抛弃, 累积, 累积&撤回. Flink的基础实现(WaterMark, RetractableWindowOperator), Sink算子(Appendable, Retractable)...等解决.

# 流批一致是什么意思?

Google的DataFlow论文以及产品中, 认为不存在流处理与批处理, 它们只是在处理有界数据和无界数据的两种不同实现而已.

于是在DataFlow中, 无论是有界数据流还是无界数据流, 都被抽象城数据集Collection, 认为流处理是批处理的超集.

换句话说, 在"流处理"中的全局窗口, 基于proc/event time顺序的数据集处理就是"批处理".

# 为什么现在推广Streaming SQL(流批一致的表达)

(个人观点)

SQL确实非常优秀, 在DataFlow Model提出了流表相对论后, Streaming SQL成为了可能.

SQL有几个优点, 能做到有界和无界在表达上完全一致, 对于视图和计算的表达力也足够强, 入门和使用人群足够广大. 反而Streaming API在目前的表达力却远远不足于有界数据集的函数式原语处理, 如MapReduce, Java Stream...如常用的函数式distinct原语, SQL常用的Count distinct查询, 在各个Streaming API中并没有相关可撤回实现, 需要自助实现

(DataStream目前没有更高级原语, 于是需要转换为StateFul Mapping Function的变换映射思路, 其实也不会太难)

一个例子就是, 城市人口转移的统计处理. 假设有如下无界数据集合, 代表城市人口转移事件流.

eventId person city eventTime
1 君兰 广州 2020-06-29 00:00:00
2 Pony马 广州 2020-07-01 00:00:00
3 君兰 杭州 2020-07-01 00:00:00

• What: 城市人口数. 由DataFlow的操作符设计 • Where: 全局时序窗口, Over事件型窗口, 无WaterMark. Over窗口为根据person分组, 事件时间倒序组内"无界"排序 • When: 由于是全局时序窗口, 为了让数据可计算, 需要设置为间隔型/事件触发Trigger. (DataFlow全局窗口默认NeverTrigger) • How: 由于物理条件, 事件流的eventime上可能乱序, 对于这个场景, 采用Discard丢弃策略即正确

# Streaming SQL

如下, SQL的表达式无界与有界都一致. 例子1, 例子2分别是在Blink程序正确SQL表达, 背后不同Blink版本的优化策略不一样.

其中, 例子3的SQL也体现出Streaming SQL在目前阶段的实现与优化能力与理论还有差距, 而例子4是一个简单却错误的写法.

// 例子1: ANSI SQL 标准写法
select
    `city`,
    count(`person`) as `count`
from (
    select
        `person`,
        `city`,
        `event_time`
    from (
        select
            `person`,
            `city`,
            `event_time`
            -- over 开窗
            row_number() over (partition by `person` order by `event_time` desc) as `row_num`
        from `event_source`
    )
    where `row_num` = '1'
    -- 当"君兰"城市从"广州"转移到"杭州"
    -- 按照排序上一条记录变成`row_number` = 2
    -- 于是上一个记录将会被撤回, 发送(-, "君兰", "广州", "2020-06-29 00:00:00")
    -- 最终发送(+, "君兰", "杭州", "2020-07-01 00:00:00")
)
group by `city`

// 例子2: flink SQL lastvalue UDF写法
select
	  count(`person`),
	  `city`
from (
    select
        `person`,
        -- 此处函数将会触发撤回, 和重新累积, 过程如例子1
        last_value(`city`, `event_time`)
    from `event_source`
    group by `person`
)
group by `city`

// 例子3: mysql 传统写法, 全局order不被支持会报错, 必须通过开窗(over window)来排序
select
    count(`person`),
    `city`
from (
    select
        `person`,
        distinct `city`,
    from `event_source`
    group by `person`
    order by `event_time` desc
)
group by `city`

// 例子4: 错误写法, 此为取最早的第一个
select
    count(distinct `person`),
    `city`
from `event_source`
group by `city`

值得注意的是, 如同例子2, 看起来没有开窗来, 只有分组KeyState, 但是仍然受限于现实条件, KeyState存在空间与声明周期限制.

在Blink默认实现中, KeyState声明周期为3日, 此即为一个隐形的窗口. 一旦理想的TTL过期, Streaming SQL就会发生预料外的不正确性.

同时, 就算KeyState设置为3日TTL, 如果要将十几亿人口进行keyBy, 在空间上依然是一个恐怖的量级. Blink目前的介绍, 是一个理论支持Unbounded State, 通过数据库来保证.

这些都是当今Streaming处理中, 在现实框架的设计中可能需要"额外"考虑的地方.

# 缺点

SQL定义为结构化查询语言, 对于数据查询与视图非常合适, 这时候一些实时报表与大图类的应用, 一个SQL文本即可以搞定超大数据的流式计算.

然而SQL终究不是设计于数据集"处理"的语言

  • 对于复杂事件计算, 超时等场景, 表达力非常鸡肋和累人, 并且由于没有合适的语法高亮, 可读性并不高. 可以参考复杂事件处理 (opens new window)
  • 对于数据的副作用描述的能力约等于没有. 这时候只能引申出UDF等等魔法方法, 来描述流式数据的副作用.

这时候, 借助map&flatmap, reduce&aggregate, stateful function, i/o function等编程式API或许会是你更佳选择

# Table API

Table API 是一套与Streaming SQL对等的DSL API. 撤回等高阶方法就实现在Table API的TableAggregateFunction 用Scala编写比较方便, 并且结合了编程式API, 在复杂事件处理处理上有优势 结合IDE能够获得一些强类型的提示

Table result = orders
    .window(Over
        .partitionBy($("a"))
        .orderBy($("rowtime", desc))
        .preceding(UNBOUNDED_RANGE)
        .as("w"))
    .select(
        $("a"), $("b").avg().distinct().over($("w")),
        $("b").max().over($("w")),
        $("b").min().over($("w"))
    );

# Steaming API

Streaming API确实是一个比较底层的库, 很多与动态流表相关的高阶解决方案如去重, 撤回, 是没有支持的...

但是对于一些定制化计算程序, Streaming API的手脚空间就非常大了

env
  // [person:String, city:String, eventTime:Timestamp]
  .addSource(eventSource)
  .keyBy(0)
  .maxBy(2)
  // 以上不会转换为retractale operator, 仅仅将最大的继续发送
  .keyBy(1)
  .process {
      // 手写keyState和RetractTuple
  }
  .sink {
      // 手写Retract Sink
  }

由于Table API与SQL对等, 而集团DataWorks对于SQL的支持是比较好的, 所以从集团内部来讲, SQL相对友好. 同时集团在很多算子和算法的DAG生成中, 对于Table API/SQL有很多杀器优化生成.

而Streaming API更多用于解决复杂State处理, 以及集团不支持的自定义复杂Source/Sink的程序编写.

# 数据的处理顺序(若你关心)

对于数据集来说, 乱序是一个非常常见的问题, 在多源输入合并, 网络传输抖动和延迟的情况下, 乱序就会发生.

有界的数据流不会存在这样的问题, 因为有界数据流可以直接对整个数据集进行排序处理. 但是无界数据就非常难了, 因为无界数据集是不可遍历的.

DataFlow Model提出了WaterMark的概念. 此时将eventTime作为waterMark, 通过Window/Trigger结合, 能够在牺牲一定时延的情况下, 尽可能的保证eventTime与processTime落在同一水位分布上, 从而让无界数据流做出理想的正确计算.

当然, 更有可能是你的数据流根本不关心顺序, 又或者用本地处理时间ProcessTime就处理的很好, Blink默认也会有根据Period间断性的, 基于ProcessTime本地处理时间的WaterMark实现, 而你并不需要感知以上的存在.