Flink_window窗口计算如何解决乱序,延迟,迟到数据问题

2022-09-02  本文已影响0人  Eqo

Flink四大基石

window窗口 Time时间 Status状态 Checkpoint检查点

1.窗口Window和Time时间

image.png

Flinkwindow窗口,相当于把流式数据转换成一批次一批次,批处理
分类:

窗口根据窗口大小和窗口间隔分为滚动窗口,滑动窗口

窗口根据数据流分为
分组流窗口(常用)
数据流窗口


Time时间:
Flink 时间语义
1.11版本之前:


2.窗口计算

Flink流式计算当中最重要的就是 基于事件时间的时间窗口计算
EventTime事件时间的滚动窗口 ★★★★★
EventTime事件时间的滑动窗口 ★★★★
EventTime事件时间的会话窗口 ★★★★


3.基于事件时间窗口计算

4.基于时间事件的窗口计算时,如何处理乱序,延迟和迟到数据

【一个原则:基于事件时间窗口计算,不要让数据丢失,可以单独侧边流输出,进行处理分析】
下述方案,针对滚动窗口和滑动窗口,与会话窗口没有关系
默认情况:触发窗口计算后,窗口销毁

方案一 Watermark水印

【时间方式,处理乱序数据】

方案二 Allowed Lateness 允许延迟

【空间方式,内存保存窗口计算结果,处理延迟数据】

方案三 Side Output 侧边输出

兜底操作,将以上两种方案没有处理后的乱序,延迟和迟到数据,作为侧边流输出,单独处理

5.乱序数据、延迟数据、迟到数据,如何界定?

6.基于事件时间的会话窗口对乱序、延迟和迟到数据的处理(重要)

Flink Window窗口计算中,如果是基于事件时间的会话窗口,不存在乱序、延迟和迟到数据的处理。
数据到达时,如果窗口触发计算并且销毁,直接属于下一个窗口中数据,参与计算

Flink1.11版本使用

Flink 1.11之前版本,基于事假时间窗口计算时,需要显示设置(默认时间语义为处理时间ProcessTime)时间语义,并且从1.12版本开始,基于时间窗口计算API也发生变化,与以前版本稍有不同

      // 1. 执行环境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //todo step1 显示设置时间时间语义
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
crossRoadEventStream
                .assignTimestampsAndWatermarks(
                // 设置最大乱序时间maxOutOfOrderness 为1min
                new BoundedOutOfOrdernessTimestampExtractor<CrossRoadEvent>(Time.minutes(1)) {

        //todo step2 指定数据中事件时间的字段,必须为long类型
        SingleOutputStreamOperator<CrossRoadEvent> timeStream = crossRoadEventStream
                .assignTimestampsAndWatermarks(
                // 设置最大乱序时间maxOutOfOrderness 为1min
                new BoundedOutOfOrdernessTimestampExtractor<CrossRoadEvent>(Time.minutes(1)) {
// 设置滚动窗口TumblingEventTimeWindows大小为10min 并且设置窗口聚合函数
        SingleOutputStreamOperator<CrossRoadReport> reportStream = timeStream
                .keyBy(CrossRoadEvent::getRoadId)
                .window(TumblingEventTimeWindows.of(Time.minutes(10)))
                //设置侧边流
                .sideOutputLateData(lateOutputTag)
                // 设置allowedLateness 当窗口计算后,保存一段时间数据,当窗口中有数据达到时,继续触发计算
                .allowedLateness(Time.minutes(2))

Flink1.13版本使用

上一篇 下一篇

猜你喜欢

热点阅读