Flink

2020-01-05  本文已影响0人  Ary_zz

2020-01-03

时间属性

image.png

Flink支持不同的时间语义,核心是 Processing Time 和 Event Time(Row Time)


image.png

一般情况下,当你的应用遇到某些问题要从上一个 checkpoint 或者 savepoint 进行重放,如果希望结果完全相同,就只能用 Event Time。如果接受结果不同,则可以用 Processing Time。Processing Time 的一个常见的用途是,根据现实时间来统计整个系统的吞吐量。

如果单条数据之间是乱序,我们就考虑对于整个序列进行更大程度的离散化
一个 watermark 本质上就代表了这个 watermark 所包含的 timestamp 数值,表示以后到来的数据已经再也没有小于或等于这个时间的了

Flink 支持两种 watermark 生成方式。第一种是在 SourceFunction 中产生,相当于把整个的 timestamp 分配和 watermark 生成的逻辑放在流处理应用的源头。我们可以在 SourceFunction 里面通过这两个方法产生 watermark:

在分配 timestamp 和生成 watermark 的过程,虽然在 SourceFunction 和 DataStream 中都可以指定,但是还是建议生成的工作越靠近 DataSource 越好。这样会方便让程序逻辑里面更多的 operator 去判断某些数据是否乱序。Flink 内部提供了很好的机制去保证这些 timestamp 和 watermark 被正确地传递到下游的节点。

watermark传播策略基本上遵循这三点。

传播是幂等的

watermark处理过程:

image.png

state

keyed state

operator state

Managed State:由 Flink 管理的 state,刚才举例的所有 state 均是 managed state

Raw State:Flink 仅提供 stream 可以进行存储数据,对 Flink 而言 raw state 只是一些 bytes

state 是 Checkpoint 进行持久化备份的主要角色

exactly once 需要一个input buffer将数据缓存

image.png

Flink 的 Checkpoint 机制只能保证 Flink 的计算过程可以做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持

Savepoint
用户通过命令触发,由用户管理其创建与删除
标准化格式存储,允许作业升级或者配置变更
用户在恢复时需要提供用于恢复作业状态的 savepoint 路径

Externalized Checkpoint
Checkpoint 完成时,在用户给定的外部持久化存储保存
当作业 FAILED(或者 CANCELED)时,外部存储的 Checkpoint 会保留下来
用户在恢复时需要提供用于恢复的作业状态的 Checkpoint 路径

架构图


image.png

用户通过 DataStream API、DataSet API、SQL 和 Table API 编写 Flink 任务,它会生成一个 JobGraph。JobGraph 是由 source、map()、keyBy()/window()/apply() 和 Sink 等算子组成的。当 JobGraph 提交给 Flink 集群后,能够以 Local、Standalone、Yarn 和 Kubernetes 四种模式运行。

job manager

image.png

task manager

image.png

主要组件

TaskManager 被分成很多个 TaskSlot,每个任务都要运行在一个 TaskSlot 里面,TaskSlot 是调度资源里的最小单位。

standalone

image.png

作业流程解析


image.png image.png

DataStream 中物理分组方式包括:

当我们调用 DataStream#map 算法时,Flink 在底层会创建一个 Transformation 对象,这一对象就代表我们计算逻辑图中的节点。它其中就记录了我们传入的 MapFunction,也就是 UDF(User Define Function)。随着我们调用更多的方法,我们创建了更多的 DataStream 对象,每个对象在内部都有一个 Transformation 对象,这些对象根据计算依赖关系组成一个图结构,就是我们的计算图。后续 Flink 将对这个图结构进行进一步的转换,从而最终生成提交作业所需要的 JobGraph

客户端方式


image.png

取消任务

window

Window 方法接收的输入是一个WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 Window 中(一条数据可能同时分发到多个 Window 中),Flink 提供了几种通用的 WindowAssigner:tumbling window(窗口间的元素无重复),sliding window(窗口间的元素可能重复),session window 以及 global window。如果需要自己定制数据分发策略,则可以实现一个 class,继承自 WindowAssigner。

Evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,更详细的描述可以参考 org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter 两个方法。Flink 提供了如下三种通用的 evictor:

Evictor 是可选的方法,如果用户不选择,则默认没有。

Trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的 Trigger,如果默认的 Trigger 不能满足你的需求,则可以自定义一个类,继承自 Trigger 即可,我们详细描述下 Trigger 的接口以及含义:

上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:

time

指定watermark后允许的最大延迟,使用side output可以获取到这些数据


image.png

需要注意的是,设置了 allowedLateness 之后,迟到的数据也可能触发窗口,对于 Session window 来说,可能会对窗口进行合并,产生预期外的行为

Window 中的的元素同样是通过 State 进行维护,然后由 Checkpoint 机制保证 Exactly Once 语义

image.png image.png image.png image.png image.png image.png image.png image.png
image.png image.png
image.png
image.png
image.png

SQL示例

https://ververica.cn/developers/flink-sql-programming-practice/
group aggregate

SELECT psgCnt, COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;

window aggregate

SELECT 
  toAreaId(lon, lat) AS area, 
  TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end, 
  COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat) and isStart
GROUP BY 
  toAreaId(lon, lat), 
  TUMBLE(rideTime, INTERVAL '5' MINUTE) 
HAVING COUNT(*) >= 5;
image.png

建议对 Group Aggregate 的作业配上 State TTL 的配置

chain operator

通过OperatorChain这个类来将多个operator链在一起形成一个新的operator
可以形成chain的条件:

  1. 上下游的并行度一致
  2. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
  3. 上下游节点都在同一个 slot group 中(下面会解释 slot group)
  4. 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
  5. 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
  6. 两个节点间数据分区方式是 forward(参考理解数据流的分区
  7. 用户没有禁用 chain

主要通过OperatorChain的ChainingOutput实现性能优化
可以调用startNewChain()开启新chain
可以调用disableChaining()表明该operator不参与chain

createChainedOperator方法里递归调用了createOutputCollector,所以chained operators实际上是从下游往上游去反向一个个创建和setup的。以word count为例,chained operators为:StreamGroupedReduce - StreamFilter - StreamSink,而实际初始化顺序则相反:StreamSink - StreamFilter - StreamGroupedReduce。

在OperatorChain类中,headOperator为StreamGroupedReduce。createOutputCollector的调用过程如下:

createOutputCollector(operatorConfig=<StreamGroupedReduce config>, ...)
 --> chainedOpConfig = <StreamFilter config>
 --> createChainedOperator(chainedOpConfig=<StreamFilter config>)
    --> createOutputCollector(<StreamFilter config>)
    --> chainedOpConfig = <StreamSink config>
        --> createChainedOperator(<StreamSink config>)
            --> createOutputCollector(<StreamSink config>)
            --> chainedOpConfig = null, 返回BroadcastingOutputCollector
            --> StreamSink.setup(<output=BroadcastingOutputCollector>)
            --> return CopyingChainingOutput
    --> output = CopyingChainingOutput
    --> StreamFilter.setup(<output=CopyingChainingOutput>)
    --> return CopyingChainingOutput
--> output = CopyingChainingOutput
--> headOperator.setup(<output=CopyingChainingOutput>)            

如果operator chain中只有一个operator,在word count的例子中,在StreamSource之后的flatMap,就是这种情况,它不能跟后面的操作chain在一起。

首先OperatorChain构造函数中的chainedConfigs会为空,因为下游没有跟它chain在一起的operator。接下来看下它的chainEntryPoint:

在createOutputCollector方法中,由于没有chained outputs,因此会直接返回RecordWriterOutput,即headOperator的output就直接交给record writer输出了。

code reading

注解

@FunctionalInterface

代码

SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。相应的,还有一个 CoLocationGroup 类用来强制将 subtasks 放到同一个 slot 中。CoLocationGroup主要用于迭代流中,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上

默认情况下,所有的operator都属于默认的共享组default,也就是说默认情况下所有的operator都是可以共享一个slot的。而当所有input operators具有相同的slot共享组时,该operator会继承这个共享组。为了防止不合理的共享,用户也能通过API来强制指定operator的共享组,比如:someStream.filter(...).slotSharingGroup("group1");就强制指定了filter的slot共享组为group1。

抽象类Slot定义了该槽位属于哪个TaskManager(instance)的第几个槽位(slotNumber),属于哪个Job(jobID)等信息。最简单的情况下,一个slot只持有一个task,也就是SimpleSlot的实现。复杂点的情况,一个slot能共享给多个task使用,也就是SharedSlot的实现。SharedSlot能包含其他的SharedSlot,也能包含SimpleSlot。所以一个SharedSlot能定义出一棵slots树。

关于Flink调度,有两个非常重要的原则我们必须知道:(1)同一个operator的各个subtask是不能呆在同一个SharedSlot中的,例如FlatMap[1]和FlatMap[2]是不能在同一个SharedSlot中的。(2)Flink是按照拓扑顺序从Source一个个调度到Sink的。例如WordCount(Source并行度为1,其他并行度为2),那么调度的顺序依次是:Source -> FlatMap[1] -> FlatMap[2] -> KeyAgg->Sink[1] -> KeyAgg->Sink[2]。

上一篇 下一篇

猜你喜欢

热点阅读