Time & Watermark
2021-04-25 本文已影响0人
ZYvette
Time
Event Time、Ingestion Time、Processing TimeEvent-Time 表示事件发生的时间,Processing-Time 则表示处理消息的时间(墙上时间),Ingestion-Time 表示进入到系统的时间。
在 Flink 中我们可以通过下面的方式进行 Time 类型的设置
// 设置使用ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
watermark
image.png- watermark是指定时生成一个时间戳,用于标杆当前数据是否延迟,任务watermark的时间点任务之后的数据都是晚到的。
- 因为数据会有延迟,watermark不能完全解决延迟问题,所以实际中可以设置允许延迟,并触发延迟数据处理。
- watermark是用于处理EventTime的数据。
watermark生成
watermark strategies
new WatermarkStrategy<String>() {
@Override
public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<String>() {
@Override
public void onEvent(String s, long l, WatermarkOutput watermarkOutput) {
}// 处理事件数据
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
}//周期性更新watermark
};
}
@Override
public TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new TimestampAssigner<String>() {
@Override
public long extractTimestamp(String s, long l) {
return 0;
}
};
}
}
WatermarGenerator种类:
- Periodic WatermarkGenerator:周期性,一般onEvent和onPeriodicEmit都实现。
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
// don't need to do anything because we work on processing time
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
}
}
- Punctuated WatermarkGenerator:标记性,只实现onEvent
public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
@Override
public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// don't need to do anything because we emit in reaction to events above
}
}
空闲数据源
- 处理Idle Sources
当等待一段时间后仍没有数据,就认为是空闲状态,下游不再等待watermark。这样能避免数据倾斜问题。 - forBoundedOutOfOrderness
固定延迟生成水印
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
watermark合并
-
wattermark与kafka
kafka一般有多个partition,flink会为每个partition生成watermark,取watermark最小值。 -
算子watermark合并方式
当前算子在下发前会对其进行触发的时间完全进行处理,算子当前的watermark会取其两个输入的最小值。
参考:https://zhuanlan.zhihu.com/p/158951593
https://cloud.tencent.com/developer/article/1629585