Apache Flink——Watermark 水位线
前言
在流数据处理应用中,一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。
基本概念是什么
- Window:Window是处理无界流的关键,Windows将流拆分为一个个有限大小的buckets,可以可以在每一个buckets中进行计算。
- start_time、end_time:当Window时时间窗口的时候,每个window都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间。
- event-time: 事件发生时间,是事件发生所在设备的当地时间,比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间。
- Watermarks:可以把他理解为一个水位线,等于evevtTime - delay(比如规定为20分钟),一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的。
推迟窗口触发的时间,实现方式:通过当前窗口中最大的eventTime-延迟时间所得到的Watermark与窗口原始触发时间进行对比,当Watermark大于窗口原始触发时间时则触发窗口执行!我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。
![](https://img.haomeiwen.com/i13587608/ecfd8c6f9bd12abe.png)
那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。
Watermark是一种衡量Event Time进展的机制。 Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现。 数据流中的Watermark用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。 Watermark可以理解成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间等于maxEventTime – t,那么这个窗口被触发执行。 有序流的Watermarker如下图所示:(Watermark设置为0)
![](https://img.haomeiwen.com/i13587608/5f72bbfd36ac7eba.png)
乱序流的Watermarker如下图所示:(Watermark设置为2)
![](https://img.haomeiwen.com/i13587608/aaea264af9f9d262.png)
当Flink接收到数据时,会按照一定的规则去生成Watermark,这条Watermark就等于当前所有到达数据中的maxEventTime - 延迟时长,也就是说,Watermark是由数据携带的,一旦数据携带的Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于Watermark是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。
上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为5s的事件对应的Watermark是3s,时间戳为9s的事件的Watermark是7s,如果我们的窗口1是1s-3s,窗口2是4s-6s,那么时间戳为5s的事件到达时的Watermarker恰好触发窗口1,时间戳为9s的事件到达时的Watermark触发窗口2。
Watermark 就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时刻为准在窗口范围内的所有所有数据都会收入窗中。只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。
一、时间语义
时间本身就有着“流”的特性,它可以用来判断事件发生的先后以及间隔;所以如果我们想要划定窗口来收集数据,一般就需要基于时间。对于批处理来说,这似乎没什么讨论的必要,因为数据都收集好了,想怎么划分窗口都可以;而对于流处理来说,如果想处理更加实时,就必须对时间有更加精细的控制。
1.1 Flink 中的时间语义
对于一台机器而言,“时间”自然就是指系统时间。但Flink 是一个分布式处理系统,分布式架构最大的特点,就是节点彼此独立、互不影响,这带来了更高的吞吐量和容错性;但有利必有弊,最大的问题也来源于此。
在分布式系统中,节点“各自为政”,是没有统一时钟的,数据和控制信息都通过网络进
行传输。因为网络传输会有延迟,而且这延迟是不确定的,所以 JobManager 作为管理者统一向所有 TaskManager 发送同步时钟信号, 发出的同步信号无法同时到达所有节点;想要拥有一个全局统一的时钟,在分布式系统里是做不到的。
另一个麻烦的问题是,在流式处理的过程中,数据是在不同的节点间不停流动的,这同样也会有网络传输的延迟。这样一来,当上下游任务需要跨节点传输数据时,它们对于“时间”的理解也会有所不同。例如,上游任务在 8 点 59 分 59 秒发出一条数据,到下游要做窗口计算时已经是 9 点零 1 秒了,那这条数据到底该不该被收到 8 点~9 点的窗口呢?
所以,当我们希望对数据按照时间窗口来进行收集计算时,“时间”到底以谁为标准就非常重要了。
![](https://img.haomeiwen.com/i13587608/5d811c6ed7c14a94.png)
在事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被 Flink 系统中的 Source 算子读取消费,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。
这里有两个非常重要的时间点:一个是数据产生的时间,我们把它叫作“事件时间”(Event Time);另一个是数据真正被处理的时刻,叫作“处理时间”(Processing Time)。我们所定义的窗口操作,到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后。
1.1.1 处理时间(Processing Time)
处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。
如果我们以它作为衡量标准,那么数据属于哪个窗口就很明显了:只看窗口任务处理这条数据时,当前的系统时间即可。
这种方法非常简单粗暴,不需要各个节点之间进行协调同步,也不需要考虑数据在流中的位置,简单来说就是“我的地盘听我的”。所以处理时间是最简单的时间语义。
1.1.2 事件时间(Event Time)
事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。
数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实就是这条数据记录的“时间戳”(Timestamp)。
在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了,而是依赖于数据本身产生的时间。
由于流处理中数据是源源不断产生的,一般来说,先产生的数据也会先被处理,所以当任务不停地接到数据时,它们的时间戳也基本上是不断增长的,就可以代表时间的推进。
这里有个前提,就是“先产生的数据先被处理”,这要求我们可以保证数据到达的顺序。但是由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。
实际项目开发中大多数都是使用的事件时间
二、水位线(Watermark)
在实际应用中,一般会采用事件时间语义。而水位线,就是基于事件时间提出的概念。一个数据产生的时刻,就是流处理中事件触发的时间点,这就是“事件时间”,一般都会以时间戳的形式作为一个字段记录在数据里。这个时间就像商品的“生产日期”一样,一旦产生就是固定的,印在包装袋上,不会因为运输辗转而变化。如果我们想要统计一段时间内的数据,需要划分时间窗口,这时只要判断一下时间戳就可以知道数据属于哪个窗口了。
明确了一个数据的所属窗口,还不能直接进行计算。因为窗口处理的是有界数据,我们需要等窗口的数据都到齐了,才能计算出最终的统计结果。
对于时间窗口来说这很明显:到了窗口的结束时间,自然就应该收集到了所有数据,就可以触发计算输出结果了。
2.1 水位线的定义
在事件时间的语义下,不依赖系统时间,而是基于数据自带的时间戳去定义一个时钟,用来表示当前时间的进展。
在数据流中加入一个时钟标记,记录当前的事件时间,这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了,这种类似于水流中用来做标志的记号,在Flink中被称为水位线。
![](https://img.haomeiwen.com/i13587608/27f3bdfb68a3439f.png)
有序流中的水位线
![](https://img.haomeiwen.com/i13587608/0806184d31b4d950.png)
在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中。在实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条时间就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的时间差会非常小(比如几毫秒),往往对处理计算没什么影响,故为了提高效率,一般会采取每隔一段时间生成一个水位线(对应于时间戳)。这个每隔的时间周期指的是处理时间(系统时间)
乱序流中的水位线
![](https://img.haomeiwen.com/i13587608/6116e75b9b178b4a.png)
在分布式系统中,数据在节点间的传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是所谓的乱序。为了从乱序流中插入水位线,我们就需要定义一个规则:插入新的水位线时,先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。
![](https://img.haomeiwen.com/i13587608/db33c4e5dbbacac3.png)
如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线,这时只需要保存一下之前所有数据中最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线。
![](https://img.haomeiwen.com/i13587608/a16bb12faa2ad365.png)
但也有一个问题,我们无法正确处理“迟到”的数据。为了让窗口能正确收集到迟到的数据,我们可以等上几秒,也就是用当前已有数据的最大时间戳减去几秒,就是要插入的水位线的时间戳。
![](https://img.haomeiwen.com/i13587608/67a26fcc80235eb1.png)
水位线的特性
水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。
- 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
- 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
- 水位线是基于数据的时间戳生成
- 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
- 水位线可以通过设置延迟,来保证正确处理乱序数据
- 一个水位线t,表示在当前流中事件时间已经达到了时间戳t,代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ <=t 的数据
水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成堆乱序数据的正确处理。
2.2 水位线的生成
计算处理更快、实时性更强、计算准确性尽可能得到保障,我们就需要设置合理的水位线。
水位线生成策略
在Flink的DataStream API中,有一个单独用于生成水位线的方法:assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy)
上述方法需要传入一个watermarkStrategy参数,这就是所谓的水位线生成策略
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,
WatermarkGeneratorSupplier<T>{
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
-
TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
-
WatermarkGenerator:主要负责按照既定方式,基于时间戳生成水位线。在WatermarkGenerator接口中有两个方法:onEvent,onPeriodicEmit。
- onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做出各种操作。
- onPeriodicEmit:周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的…setAutoWatermarkInterval()方法来设置,默认为200ms。
public interface WatermarkGenerator<T> {
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
void onPeriodicEmit(WatermarkOutput output);
}
env.getConfig().setAutoWatermarkInterval(100);
2.3 Flink内置水位线生成器
WatermarkStrategy 这个接口是一个生成水位线策略的抽象,让我们可以灵活地实现自己的需求;但看起来有些复杂,如果想要自己实现应该还是比较麻烦的。Flink 提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板。
这两个生成器可以通过调用 WatermarkStrategy 的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。
2.3.1 有序流
对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用
WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。
import com.yibo.flink.datastream.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: huangyibo
* @Date: 2022/7/3 1:42
* @Description: 有序流的Watermark生成
*/
public class WaterMark {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置生成水位线的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
List<Event> list = new ArrayList<>();
list.add(new Event("Mary","./home",1000L));
list.add(new Event("Bobo","./cart",2000L));
list.add(new Event("Alice","./cart",3000L));
list.add(new Event("Bobo","./prod?id=1",4000L));
list.add(new Event("Bobo","./prod?id=2",4500L));
SingleOutputStreamOperator<Event> streamOperator = env.fromCollection(list)
//有序流的Watermark生成
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.getTimestamp();
}
}));
streamOperator.print();
env.execute();
}
}
上面代码中我们调用.withTimestampAssigner()方法,将数据中的 timestamp 字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。这样,提取出的数据时间戳,就是我们处理计算的事件时间。
这里需要注意的是,时间戳和水位线的单位,必须都是毫秒。
2.3.2 乱序流
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed
Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy.forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
import com.yibo.flink.datastream.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: huangyibo
* @Date: 2022/7/3 1:42
* @Description: 乱序流的Watermark生成
*/
public class WaterMark {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置生成水位线的时间间隔
env.getConfig().setAutoWatermarkInterval(100);
List<Event> list = new ArrayList<>();
list.add(new Event("Mary","./home",1000L));
list.add(new Event("Bobo","./cart",2000L));
list.add(new Event("Alice","./cart",3000L));
list.add(new Event("Bobo","./prod?id=1",4000L));
list.add(new Event("Bobo","./prod?id=2",4500L));
//乱序流的Watermark生成
SingleOutputStreamOperator<Event> streamOperator = env.fromCollection(list)
// 插入水位线的逻辑
.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为 2s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
// 抽取时间戳的逻辑
.withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
);
streamOperator.print();
env.execute();
}
}
事实上,有序流的水位线生成器本质上和乱序流是一样的,相当于延迟设为 0 的乱序流水位线生成器,两者完全等同:
WatermarkStrategy.forMonotonousTimestamps()
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
注意:乱序流中生成的水位线真正的时间戳,其实是当前最大时间戳 - 延迟时间 - 1,这里单位是毫秒。
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
2.4 自定义水位线策略
一般来说,Flink 内置的水位线生成器就可以满足应用需求了。不过有时我们的业务逻辑
可能非常复杂,这时对水位线生成的逻辑也有更高的要求,我们就必须自定义实现水位线策略WatermarkStrategy 了。
在 WatermarkStrategy 中,时间戳分配器 TimestampAssigner 都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于 WatermarkGenerator 的实现。整体说来,Flink有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。
WatermarkGenerator接口中有两个方法:onEvent()、onPeriodicEmit(),前者是在每个时间到来时调用,后者由框架周期性调用。周期性调用的方法中发出水位线,自然就是周期性生成水位线;而在事件触发的方法中发出水位线,自然就是断点式生成了。两种方式的不同就集中体现在这两个方法的实现上。
2.4.1 周期性水位线生成器(Periodic Generator)
周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CustomWatermarkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();
env.execute();
}
public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
}
};
}
@Override
public WatermarkGenerator<Event>
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomPeriodicGenerator();
}
}
public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
private Long delayTime = 5000L; // 延迟时间
private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput
output) {
// 每来一条数据就调用一次
maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 发射水位线,默认 200ms 调用一次
output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
}
}
}
们在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;这个方法
由系统框架周期性地调用,默认 200ms 一次。所以水位线的时间戳是依赖当前已有数据的最大时间戳的(这里的实现与内置生成器类似,也是减去延迟时间再减 1),但具体什么时候生成与数据无关。
2.4.2 断点式水位线生成器(Punctuated Generator)
断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,
就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。
public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
@Override
public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
// 只有在遇到特定的 itemId 时,才发出水位线
if (r.user.equals("Mary")) {
output.emitWatermark(new Watermark(r.timestamp - 1));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线
}
}
在 onEvent()中判断当前事件的 user 字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。
2.5 在自定义数据源中发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自 定义数据源中发送了水位线以后,就不能再在程序中使用 assignTimestampsAndWatermarks 方 法 来 生 成 水 位 线 了 。 在 自 定 义 数 据 源 中 生 成 水 位 线 和 在 程 序 中 使 用 assignTimestampsAndWatermarks 方法生成水位线二者只能取其一。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.util.Calendar;
import java.util.Random;
public class EmitWatermarkInSourceFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.addSource(new ClickSourceWithWatermark()).print();
env.execute();
}
// 泛型是数据源中的类型
public static class ClickSourceWithWatermark implements SourceFunction<Event> {
private boolean running = true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
Random random = new Random();
String[] userArr = {"Mary", "Bob", "Alice"};
String[] urlArr = {"./home", "./cart", "./prod?id=1"};
while (running) {
long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
String username = userArr[random.nextInt(userArr.length)];
String url = urlArr[random.nextInt(urlArr.length)];
Event event = new Event(username, url, currTs);
// 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段
sourceContext.collectWithTimestamp(event, event.timestamp);
// 发送水位线
sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
}
在自定义水位线中生成水位线相比 assignTimestampsAndWatermarks 方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写 Flink 的测试程序,测试 Flink 的各种各样的特性。
2.6 水位线的传递
在实际应用中往往上下游都有多个并行子任务,为了统一推进事件时间的进展,我们要求上游任务处理完水位线、时钟改变之后,要把当前的水位线广播给所有的下游任务。这样,后续任务就不需要依赖原始数据中的时间戳,也可以知道当前事件时间了。
![](https://img.haomeiwen.com/i13587608/3c6f80ea3c91ea23.png)
如图 所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下:
-
1、上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线” (Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
-
2、当有一个新的水位线(第一分区的 4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的 3,于是当前任务时钟就推进到了 3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。
-
3、再次收到新的水位线(第二分区的 7)后,执行同样的处理流程。首先将第二个分区时钟更新为 7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
-
4、同样道理,当又一次收到新的水位线(第三分区的 6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的 4,所以当前任务的时钟推进到 4,并发出时间戳为 4 的水位线,广播到下游各个分区任务。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。对于有多条流合并之后进行处理的场景,水位线传递的规则是类似的。
2.7 水位线的总结
水位线在事件时间的世界里面,承担了时钟的角色,是唯一的时间尺度。
水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒
在数据流开始之前,Flink 会插入一个大小是负无穷大(在 Java 中是-Long.MAX_VALUE)的水位线,而在数据流结束时,Flink 会插入一个正无穷大(Long.MAX_VALUE)的水位线,保证所有的窗口闭合以及所有的定时器都被触发。
对于离线数据集,Flink 也会将其作为流读入,也就是一条数据一条数据的读取。在这种
情况下,Flink 对于离线数据集,只会插入两次水位线,也就是在最开始处插入负无穷大的水位线,在结束位置插入一个正无穷大的水位线。因为只需要插入两次水位线,就可以保证计算的正确,无需在数据流的中间插入水位线了。
水位线的重要性在于它的逻辑时钟特性,而逻辑时钟这个概念可以说是分布式系统里面最为重要的概念之一了,理解透彻了对理解各种分布式系统非常有帮助。具体可以参考 Leslie Lamport 的论文。
参考:
https://blog.csdn.net/weixin_47491957/article/details/124400164