Flink 1.2Time & WaterMark

2021-06-13  本文已影响0人  caster

1.Event:流处理中处理的数据代表的实际业务事件。

2.Time:确定事件的时间概念,用于flink基于时间的操作(window等)。

三种时间语义
  1. Processing Time
    Processing Time 是指事件被操作算子处理时机器的系统时间。不需要流和机器之间的协调,它提供了最好的性能和最低的延迟。但是,在分布式和异步的环境下,Processing Time 不能提供确定性,因为它容易受到事件到达系统的速度(例如从消息队列)、事件在系统内操作流动的速度以及中断的影响。
  2. Event Time
    Event Time 是事件发生的时间。事件(数据)自带的时间字段,与flink无关,需要程序指定如何从数据中构造time水印。此方式处理事件可以完美得到想要的结果,但是无序到达事件会导致程序延时处理,且不能无限等待迟到数据会导致结果一致性问题。正常业务需求关注的都是event time,event time会存在迟到数据的情况,需要引入Watermark迟到数据处理等概念。
  3. Ingestion Time
    Ingestion Time 是事件进入Flink Source的时间。在源操作处,每个事件将源的当前时间作为时间戳,其介于event 和 processing之间。

1.12之后默认使用event time。

3.Watermark(水印)

Watermark是一种特殊的时间戳,也是一种被插入到数据流的特殊的数据(StreamElement),用于表示延迟多久计算当前窗口的聚合操作。可以防止Event乱序到达flink时,计算无限等待迟到的数据。
延迟处理示例:

  1. [1s,5s)的window窗口,设置延迟时间为为3s,则8s的数据到达后会在stream中插入watermark5,就会触发[1s,5s)的window窗口计算;
  2. 如果还存再延迟数据,则使用allowedLateness(),每次迟到数据来了之后重新计算并输出;
  3. 如果还存在,则sideout输出后再进行后续处理。

插入的watremark为单调递增的,与数据时间戳相关。
数据到达后分配到对应的窗口,watermark到达后触发对应的窗口计算。
watermark的传递:
watermark在重分区操作后(keyby,rebanlance等)会广播到全部下游分区。
下游分区收到上游多个分区watermark,会存储每个上游分区的watermark,以最小的watermark为准进行操作,如果最小watermark更新,则向后续下游广播。

3.1.watermark使用示例

处理每条数据时都使用到目前为止event time最大值,设置最大等待时间为3s,水印则为大当前最大event time-3s,window(10s)会等待3s触发上一个窗口计算:(旧版本方式)

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
    private final long maxOutOfOrderness = 3000; // 3.0 seconds
    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        // 以迄今为止收到的最大时间戳来生成 watermark
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}
event time max event time watermark 操作
10000 10000 7000 水印为当前最大值-3s
11000 11000 8000
12000 12000 9000
18000 18000 15000
13000 18000 15000 最大值和水印不变
19000 19000 16000
20000 20000 17000
23000 23000 20000 触发窗口[10,20)计算
25000 25000 22000 不会再触发窗口[10,20)计算

当23s的数据到达,生成的watermark时间戳为20s,触发计算10~20s窗口,但是只会计算[10,20)之间的数据,并不会把触发计算的下一窗口数据计算进去。达到等待迟到数据3s的效果,已经触发过计算的窗口不会再次被触发。
注:不是对eventTime要求特别严格的数据,尽量不要采用eventTime方式来处理,会有丢数据的风险。

在新版本1.12-中,使用WatermarkStrategy工厂方式构造构造watermark和time语义

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> 

TimestampAssignerSupplier负责定义时间语义
WatermarkGeneratorSupplier负责设置watermark
watermark类型:

旧版本设置watermark方法和类型
间断性:每一条数据后插入一条watermark,数据稀疏
周期性:每隔一段时间生成一条watermark,数据稠密
watermark成生器父类如下:
//基于event或者周期生成watermark,代替了旧版本中的:
//AssignerWithPunctuatedWatermarks :
//AssignerWithPeriodicWatermarks:周期性
public interface WatermarkGenerator<T> {
    //每条数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    //周期性调用,由ExecutionConfig.setAutoWatermarkInterval()方法指定周期时间间隔。
    void onPeriodicEmit(WatermarkOutput output);
}

常用的watermark生成器:
BoundedOutOfOrdernessWatermarks(周期性)

//
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    //到目前为止最大的event time
    private long maxTimestamp;

    //watermark最大无序延迟时间
    private final long outOfOrdernessMillis;

    //通过最大无序时间界限生成watermark生成器
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }
    //每来一条数据更新当前最大的event time
    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }
    //周期性生成watermark到数据流中
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

3.2.延时到达的数据处理

由于设置watermark会等待延时数据到达的时间不能过长,当数据迟到过久到达并不会再次触发计算。则需要处理延时到达数据:

  1. 继续触发窗口计算:
    通过allowedLateness()设置,只要满足watermark < window_end_time + allowedLateness,延迟数据进入窗口就会触发窗口计算。即当前到达的数据
    的最大时间-设置最大等待时间生成的水印,没有超过当前窗口右侧值allowedLateness范围是,会重新触发计算。
  2. 重定向到其他地方:
    ds.getSideOutput()
上一篇下一篇

猜你喜欢

热点阅读