Time & Watermark

2021-04-25  本文已影响0人  ZYvette

Time

Event Time、Ingestion Time、Processing Time

Event-Time 表示事件发生的时间,Processing-Time 则表示处理消息的时间(墙上时间),Ingestion-Time 表示进入到系统的时间。

在 Flink 中我们可以通过下面的方式进行 Time 类型的设置

// 设置使用ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); 

watermark

image.png

watermark生成

watermark strategies

new WatermarkStrategy<String>() {
            @Override
            public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
               return new WatermarkGenerator<String>() {
                   @Override
                   public void onEvent(String s, long l, WatermarkOutput watermarkOutput) {
                   }// 处理事件数据
                   @Override
                   public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                   }//周期性更新watermark
               };
            }

            @Override
            public TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                return new TimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String s, long l) {
                        return 0;
                    }
                };
            }
        }

WatermarGenerator种类:

public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // emit the watermark as current highest timestamp minus the out-of-orderness bound
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // don't need to do anything because we work on processing time
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // don't need to do anything because we emit in reaction to events above
    }
}

空闲数据源

WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withIdleness(Duration.ofMinutes(1));

watermark合并

当前算子在下发前会对其进行触发的时间完全进行处理,算子当前的watermark会取其两个输入的最小值。

参考:https://zhuanlan.zhihu.com/p/158951593
https://cloud.tencent.com/developer/article/1629585

上一篇下一篇

猜你喜欢

热点阅读