Flink的时间类型和WaterMark机制
1.1 需求背景
需求描述:每隔5秒,计算近10秒单词出现的次数。
1.1.1 TimeWindow实现
/**
* 每隔5秒统计最近10秒的单词出现的次数
*/
public class TimeWindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
dataStream.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for(String word:fields){
out.collect(Tuple2.of(word,1));
}
}
}).keyBy(0).timeWindow(Time.seconds(10),Time.seconds(5))
.sum(1)
.print().setParallelism(1);
env.execute("TimeWindowWordCount");
}
}
1.1.2 ProcessWindowFunction
/**
* 每隔5秒统计最近10秒的单词出现的次数
*/
public class TimeWindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
dataStream.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for(String word:fields){
out.collect(Tuple2.of(word,1));
}
}
}).keyBy(0).timeWindow(Time.seconds(10),Time.seconds(5))
.process(new MySumProcessWindowFunction()) //foreach key,value -> sum -> key,value
.print().setParallelism(1);
env.execute("word count..");
}
/**
* IN, OUT, KEY, W extends Window
*/
public static class MySumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>
,Tuple,TimeWindow>{
FastDateFormat dataformat=FastDateFormat.getInstance("HH:mm:ss");
@Override
public void process(Tuple key, Context context,
Iterable<Tuple2<String, Integer>> elements,
Collector<Tuple2<String, Integer>> out) throws Exception {
System.out.println("当前系统时间:"+dataformat.format(System.currentTimeMillis()));
System.out.println("窗口处理时间:"+dataformat.format(context.currentProcessingTime()));
System.out.println("窗口开始时间:"+dataformat.format(context.window().getStart()));
int sum=0;
for (Tuple2<String,Integer> ele:elements){
sum += 1;
}
out.collect(Tuple2.of(key.getField(0),sum));
System.out.println("窗口结束时间:"+dataformat.format(context.window().getEnd()));
System.out.println("=====================================================");
}
}
根据每隔5秒执行近10秒的数据,Flink划分的窗口
[00:00:00, 00:00:05) [00:00:05, 00:00:10)
[00:00:10, 00:00:15) [00:00:15, 00:00:20)
[00:00:20, 00:00:25) [00:00:25, 00:00:30)
[00:00:30, 00:00:35) [00:00:35, 00:00:40)
[00:00:40, 00:00:45) [00:00:45, 00:00:50)
[00:00:50, 00:00:55) [00:00:55, 00:01:00)
[00:01:00, 00:01:05) ...
1.1.3 Time的种类
针对stream数据中的时间,可以分为以下三种:
Event Time:事件产生的时间,它通常由事件中的时间戳描述。
Ingestion time:事件(日志,数据,消息)进入Flink的时间(不考虑)
Processing Time:事件被处理时当前系统的时间
默认情况下,我们使用的是Processing Time
案例演示: 原始日志如下
2020-04-11 10:00:01,134 INFO executor.Executor: Finished task in state 0.0
这条数据进入Flink的时间是2020-04-11 20:00:00,102
到达window处理的时间为2020-04-11 20:00:01,100
- 2020-04-11 10:00:01,134 是Event time
- 2020-04-11 20:00:00,102 是Ingestion time
- 2020-04-11 20:00:01,100 是Processing time
思考:
如果我们想要统计每分钟内接口调用失败的错误日志个数,使用哪个时间才有意义?
1.2 Process Time Window(有序)
需求:每隔5秒计算近10秒的单词出现的次数
自定义source,模拟:第 13 秒的时候连续发送 2 个事件,第 16 秒的时候再发送 1 个事件
/**
* 需求:每隔5秒计算最近10秒的单词次数
*/
public class WindowWordCountBySource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStream = env.addSource(new TestSource());
dataStream.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word:fields){
out.collect(Tuple2.of(word,1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(10),Time.seconds(5))
.process(new SumProcessFunction()).print().setParallelism(1);
env.execute("WindowWordCountAndTime");
}
public static class TestSource implements
SourceFunction<String>{
FastDateFormat dateformat = FastDateFormat.getInstance("HH:mm:ss");
@Override
public void run(SourceContext<String> cxt) throws Exception {
String currTime = String.valueOf(System.currentTimeMillis());
System.out.println(currTime);
//这个操作是我为了保证是 10s的倍数。
//我们执行的时间是10的倍数
//10 20 30 40 50
while(Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100){
currTime=String.valueOf(System.currentTimeMillis());
continue;
}
System.out.println("开始发送事件的时间:"+dateformat.format(System.currentTimeMillis()));
TimeUnit.SECONDS.sleep(3);
cxt.collect("hadoop");
cxt.collect("hadoop");
TimeUnit.SECONDS.sleep(3);
cxt.collect("hadoop");
TimeUnit.SECONDS.sleep(300000);
}
@Override
public void cancel() {
}
}
/**
* IN
* OUT
* KEY
* W extends Window
*
*/
public static class SumProcessFunction
extends ProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple,TimeWindow>{
FastDateFormat dataformat=FastDateFormat.getInstance("HH:mm:ss");
@Override
public void process(Tuple tuple, Context context,
Iterable<Tuple2<String, Integer>> allElements,
Collector<Tuple2<String, Integer>> out) {
int count=0;
for (Tuple2<String,Integer> e:allElements){
count++;
}
out.collect(Tuple2.of(tuple.getField(0),count));
}
}
}
1.3 Process Time Window(无序)
自定义source,模拟:第 13 秒的时候连续发送 2 个事件,但是有一个事件确实在第13秒的发送出去 了,另外一个事件因为某种原因在19秒的时候才发送出去,第 16 秒的时候再发送 1 个事件
/**
* 需求:每隔5秒计算最近10秒的单词次数
* 乱序
*/
public class WindowWordCountBySource2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStream = env.addSource(new TestSource());
dataStream.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line,
Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word:fields){
out.collect(Tuple2.of(word,1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(10),Time.seconds(5))
.process(new SumProcessFunction()).print().setParallelism(1);
env.execute("WindowWordCountAndTime");
}
public static class TestSource implements
SourceFunction<String>{
FastDateFormat dateformat = FastDateFormat.getInstance("HH:mm:ss");
@Override
public void run(SourceContext<String> cxt) throws Exception {
String currTime = String.valueOf(System.currentTimeMillis());
while(Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100){
currTime=String.valueOf(System.currentTimeMillis());
continue;
}
System.out.println("开始发送事件的时间:"+dateformat.format(System.currentTimeMillis()));
//13
TimeUnit.SECONDS.sleep(3);
//实际上我们的数据是在13秒的时候生成的,只是19的时候被发送出去。
String event="hadoop";
cxt.collect(event);
// cxt.collect(event);
//16
TimeUnit.SECONDS.sleep(3);
cxt.collect("hadoop");
//19
TimeUnit.SECONDS.sleep(3);
cxt.collect(event);
TimeUnit.SECONDS.sleep(300990);
}
@Override
public void cancel() {
}
}
/**
* IN
* OUT
* KEY
* W extends Window
*
*/
public static class SumProcessFunction
extends ProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple,TimeWindow>{
FastDateFormat dataformat=FastDateFormat.getInstance("HH:mm:ss");
@Override
public void process(Tuple tuple, Context context,
Iterable<Tuple2<String, Integer>> allElements,
Collector<Tuple2<String, Integer>> out) {
int count=0;
for (Tuple2<String,Integer> e:allElements){
count++;
}
out.collect(Tuple2.of(tuple.getField(0),count));
}
}
}
1.4 使用Event Time处理无序
使用Event Time处理
/**
* 需求:每隔5秒计算最近10秒的单词次数
*/
public class WindowWordCountByEventTime {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//步骤一:设置时间类型,默认的是Processtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> dataStream = env.addSource(new TestSource());
dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
//key/value
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
//作用:指定时间字段
}).assignTimestampsAndWatermarks(new EventTimeExtractor())
.keyBy(0)
.timeWindow(Time.seconds(10),Time.seconds(5))
.process(new SumProcessFunction()).print().setParallelism(1);
env.execute("WindowWordCountAndTime");
}
public static class TestSource implements
SourceFunction<String>{
FastDateFormat dateformat = FastDateFormat.getInstance("HH:mm:ss");
@Override
public void run(SourceContext<String> cxt) throws Exception {
String currTime = String.valueOf(System.currentTimeMillis());
while(Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100){
currTime=String.valueOf(System.currentTimeMillis());
continue;
}
System.out.println("开始发送事件的时间:"+dateformat.format(System.currentTimeMillis()));
TimeUnit.SECONDS.sleep(3);
//13
String event="hadoop,"+System.currentTimeMillis();//时间
cxt.collect(event);
// cxt.collect(event);
TimeUnit.SECONDS.sleep(3);//16
cxt.collect("hadoop,"+System.currentTimeMillis());
TimeUnit.SECONDS.sleep(3);
//19
cxt.collect(event);
TimeUnit.SECONDS.sleep(3000);
}
@Override
public void cancel() {
}
}
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks<Tuple2<String,Long>>{
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}
//指定时间
@Override
public long extractTimestamp(Tuple2<String, Long> element, long l) {
return element.f1;
}
}
/**
* IN
* OUT
* KEY
* W extends Window
*
*/
public static class SumProcessFunction
extends ProcessWindowFunction<Tuple2<String,Long>,Tuple2<String,Integer>,Tuple,TimeWindow>{
FastDateFormat dataformat=FastDateFormat.getInstance("HH:mm:ss");
@Override
public void process(Tuple tuple, Context context,
Iterable<Tuple2<String, Long>> allElements,
Collector<Tuple2<String, Integer>> out) {
int count=0;
for (Tuple2<String,Long> e:allElements){
count++;
}
out.collect(Tuple2.of(tuple.getField(0),count));
}
}
}
现在我们第三个window的结果已经计算准确了,但是我们还是没有彻底的解决问题。接下来就需要我 们使用WaterMark机制来解决了。
1.5 使用【WaterMark】机制解决无序
/**
* 需求:每隔5秒计算最近10秒的单词次数
*
* 引入watermark解决问题
*/
public class WindowWordCountByWaterMark {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//步骤一:设置时间类型,默认的是Processtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> dataStream = env.addSource(new TestSource());
dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
}).assignTimestampsAndWatermarks(new EventTimeExtractor())
.keyBy(0)
.timeWindow(Time.seconds(10),Time.seconds(5))
.process(new SumProcessFunction()).print().setParallelism(1);
env.execute("WindowWordCountAndTime");
}
public static class TestSource implements
SourceFunction<String>{
FastDateFormat dateformat = FastDateFormat.getInstance("HH:mm:ss");
@Override
public void run(SourceContext<String> cxt) throws Exception {
String currTime = String.valueOf(System.currentTimeMillis());
while(Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100){
currTime=String.valueOf(System.currentTimeMillis());
continue;
}
System.out.println("开始发送事件的时间:"+dateformat.format(System.currentTimeMillis()));
TimeUnit.SECONDS.sleep(3);
String event="hadoop,"+System.currentTimeMillis();
cxt.collect(event);
TimeUnit.SECONDS.sleep(3);
cxt.collect("hadoop,"+System.currentTimeMillis());
TimeUnit.SECONDS.sleep(3);
cxt.collect(event);
TimeUnit.SECONDS.sleep(3000);
}
@Override
public void cancel() {
}
}
private static class EventTimeExtractor implements AssignerWithPeriodicWatermarks<Tuple2<String,Long>>{
//设置 5s的延迟(乱序)
@Nullable
@Override
public Watermark getCurrentWatermark() {
//System.out.println("water mark:......"+System.currentTimeMillis());
return new Watermark(System.currentTimeMillis() - 5000);
}
@Override
public long extractTimestamp(Tuple2<String, Long> element, long l) {
return element.f1;
}
}
/**
* IN
* OUT
* KEY
* W extends Window
*
*/
public static class SumProcessFunction
extends ProcessWindowFunction<Tuple2<String,Long>,Tuple2<String,Integer>,Tuple,TimeWindow>{
FastDateFormat dataformat=FastDateFormat.getInstance("HH:mm:ss");
@Override
public void process(Tuple tuple, Context context,
Iterable<Tuple2<String, Long>> allElements,
Collector<Tuple2<String, Integer>> out) {
int count=0;
for (Tuple2<String,Long> e:allElements){
count++;
}
out.collect(Tuple2.of(tuple.getField(0),count));
}
}
}
1.6 WaterMark机制
1.6.2 WaterMark的定义
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部 分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原 因,导致乱序的产生,特别是使用kafka的话,多个分区的数据无法保证有序。所以在进行window计算 的时候,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去 进行计算了。这个特别的机制,就是watermark,watermark是用于处理乱序事件的。watermark可以 翻译为水位线
有序的流的watermarks
无序的流的watermarks
多并行度流的watermarks
1.6.3 小需求
得到并打印每隔 3 秒钟统计前 3 秒内的相同的 key 的所有的事件
允许最大乱序时间4s
/**
* 需求:3秒一个窗口,把相同 key合并起来
*
* hadoop,1461756862000
* hadoop,1461756866000
* hadoop,1461756872000
* hadoop,1461756873000
* hadoop,1461756874000
* hadoop,1461756876000
* hadoop,1461756877000
*
*
* window + watermark 观察窗口是如何被触发?
*
* 可以解决乱序问题
*
*
*/
public class WindowWordCountByWaterMark2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//checkpoint
DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(3))
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
env.execute("WindowWordCountByWaterMark2");
}
/**
* IN, OUT, KEY, W
* IN:输入的数据类型
* OUT:输出的数据类型
* Key:key的数据类型(在Flink里面,String用Tuple表示)
* W:Window的数据类型
*/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
/**
* 当一个window触发计算的时候会调用这个方法
* @param tuple key
* @param context operator的上下文
* @param elements 指定window的所有元素
* @param out 用户输出
*/
@Override
public void process(Tuple tuple, Context context,
Iterable<Tuple2<String, Long>> elements,
Collector<String> out) {
System.out.println("处理时间:" + dateFormat.format(context.currentProcessingTime()));
System.out.println("window start time : " + dateFormat.format(context.window().getStart()));
List<String> list = new ArrayList<>();
for (Tuple2<String, Long> ele : elements) {
list.add(ele.toString() + "|" + dateFormat.format(ele.f1));
}
out.collect(list.toString());
System.out.println("window end time : " + dateFormat.format(context.window().getEnd()));
}
}
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
//3s 计算时间最大的一个值。
private long currentMaxEventTime=0L;
private long maxOufOfOrderness=10000;//最大乱序时间
//30
@Nullable
@Override
public Watermark getCurrentWatermark() {
//watermark的计算的逻辑
//如何计算出来一个窗口里面的watermark
return new Watermark(currentMaxEventTime - maxOufOfOrderness);
}
@Override
public long extractTimestamp(Tuple2<String, Long> element, long timeStamp) {
Long currentElementTime = element.f1;
currentMaxEventTime = Math.max(currentMaxEventTime,currentElementTime);
//
System.out.println("event = " + element
+ "|" + dateFormat.format(element.f1) // Event Time
+ "|" + dateFormat.format(currentMaxEventTime) // Max Event Time
+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermark
return currentElementTime;
}
}
}
[00:00:00, 00:00:03) [00:00:03, 00:00:06)
[00:00:06, 00:00:12) [00:00:12, 00:00:15)
[00:00:15, 00:00:18) [00:00:18, 00:00:21)
[00:00:21, 00:00:24) [00:00:24, 00:00:27)
[00:00:27, 00:00:30) [00:00:30, 00:00:33)
[00:00:33, 00:00:36) [00:00:36, 00:00:39)
[00:00:42, 00:00:45) ...
Key | Event Time | currentMaxTimestamep | currentWatermark | window start time | window end time |
---|---|---|---|---|---|
000001 | 19:34:22 |
19:34:22 | 19:34:12 | ||
000001 | 19:34:26 |
19:34:26 | 19:34:16 | ||
000001 | 19:34:32 | 19:34:32 | 19:34:22 | ||
000001 | 19:34:33 | 19:34:33 | 19:34:23 | ||
000001 | 19:34:34 | 19:34:34 | 19:34:24 |
[19:34:21
|
19:34:24 ) |
000001 | 19:34:36 | 19:34:36 | 19:34:26 | ||
000001 | 19:34:37 | 19:34:37 | 19:34:27 |
[19:34:24
|
19:34:27 ) |
总结:window触发的时间
- 1. watermark 时间 >= window_end_time
- 2. 在 [window_start_time, window_end_time) 区间中有数据存在,注意是左闭右开的区间,而且是 以 event time 来计算的
1.6.5 WaterMark+Window 处理乱序事件
/**
* 需求:3秒一个窗口,进行单词计数
*
* 迟到太多的数据:默认就丢弃了
*
* 000001,1461756879000
* 000001,1461756871000
* 000001,1461756883000
*/
public class WindowWordCountByWaterMark3 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor())
.keyBy(0)
.timeWindow(Time.seconds(3))
// .allowedLateness(Time.seconds(2)) // 允许事件迟到 2 秒
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
env.execute("WindowWordCountByWaterMark3");
}
/**
* IN, OUT, KEY, W
* IN:输入的数据类型
* OUT:输出的数据类型
* Key:key的数据类型(在Flink里面,String用Tuple表示)
* W:Window的数据类型
*/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
/**
* 当一个window触发计算的时候会调用这个方法
* @param tuple key
* @param context operator的上下文
* @param elements 指定window的所有元素
* @param out 用户输出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,
Collector<String> out) {
System.out.println("处理时间:" + dateFormat.format(context.currentProcessingTime()));
System.out.println("window start time : " + dateFormat.format(context.window().getStart()));
List<String> list = new ArrayList<>();
for (Tuple2<String, Long> ele : elements) {
list.add(ele.toString() + "|" + dateFormat.format(ele.f1));
}
out.collect(list.toString());
System.out.println("window end time : " + dateFormat.format(context.window().getEnd()));
}
}
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
private long currentMaxEventTime=0L;
private long maxOufOfOrderness=10000;//最大乱序时间
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxEventTime - maxOufOfOrderness);
}
@Override
public long extractTimestamp(Tuple2<String, Long> element, long timeStamp) {
Long currentElementTime = element.f1;
currentMaxEventTime = Math.max(currentMaxEventTime,currentElementTime);
System.out.println("event = " + element
+ "|" + dateFormat.format(element.f1) // Event Time
+ "|" + dateFormat.format(currentMaxEventTime) // Max Event Time
+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermark
return currentElementTime;
}
}
}
Key | Event Time | currentMaxTimestamep | currentWatermark | window start time | window end time |
---|---|---|---|---|---|
000001 | 19:34:39 | 19:34:39 | 19:34:29 | ||
000001 | 19:34:31 |
19:34:39 | 19:34:29 | ||
000001 | 19:34:43 | 19:34:43 | 19:34:33 | [19:34:30
|
19:34:33 ) |
1.6.6 迟到太多的事件
1. 丢弃,这个是默认的处理方式
2. allowedLateness 指定允许数据延迟的时间(我们一般不选择)
3. sideOutputLateData 收集迟到的数据(修正实时的数据)
丢弃
重启程序,做测试。
输入数据:
000001,1461756870000
000001,1461756883000
000001,1461756870000
000001,1461756871000
000001,1461756872000
Key | Event Time | currentMaxTimestamep | currentWatermark | window start time | window end time |
---|---|---|---|---|---|
000001 | 19:34:30 | 19:34:30 | 19:34:20 | ||
000001 | 19:34:43 | 19:34:43 | 19:34:33 | [19:34:30 | 19:34:33) |
000001 | 19:34:30 | 19:34:43 | 19:34:33 | [19:34:30 | 19:34:33) |
000001 | 19:34:31 | 19:34:43 | 19:34:33 | [19:34:30 | 19:34:33) |
000001 | 19:34:32 | 19:34:43 | 19:34:33 | [19:34:30 | 19:34:33) |
发现迟到太多数据就会被丢弃
指定允许再次迟到的时间
).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(3))
.allowedLateness(Time.seconds(2)) // 允许事件迟到 2 秒
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
输入数据
000001,1461756870000
000001,1461756883000
000001,1461756870000
000001,1461756871000
000001,1461756872000
000001,1461756884000
000001,1461756870000
000001,1461756871000
000001,1461756872000
000001,1461756885000
000001,1461756870000
000001,1461756870000
000001,1461756872000
- 1. 当我们设置允许迟到 2 秒的事件,第一次 window 触发的条件是 watermark >= window_end_time
-
2. 第二次(或者多次)触发的条件是 watermark < window_end_time + allowedLateness
收集迟到的数据
/**
* 需求:3秒一个窗口,进行单词计数
*
* 指定延迟时间
* 收集延迟数据
*/
public class WindowWordCountByWaterMark4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置waterMark产生的周期为1s
env.getConfig().setAutoWatermarkInterval(1000);
DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
//步骤一:声明这样的一个数据结构
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-date"){};
SingleOutputStreamOperator<String> result = dataStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
return new Tuple2<>(fields[0], Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor())
.keyBy(0)
.timeWindow(Time.seconds(3))
// .allowedLateness(Time.seconds(2)) // 允许事件迟到 2 秒
.sideOutputLateData(outputTag) //保留迟到太多的数据
.process(new SumProcessWindowFunction());
//打印结果数据
result.print();
/**
*
* 处理延迟的数据:
* 1. 准备另外一个延迟的topic(Kafka,磁盘,数据库)
*
* 不会有太多,如果没有太多,其实结果影响不大。
*
*
* 实时的任务,大多数的情况就是为了看这个趋势而已。
*
*
*/
result.getSideOutput(outputTag).map(new MapFunction<Tuple2<String,Long>, String>() {
@Override
public String map(Tuple2<String, Long> stringLongTuple2) throws Exception {
return "迟到数据:"+stringLongTuple2.toString();
}
}).print();
env.execute("WindowWordCountByWaterMark3");
}
/**
* IN, OUT, KEY, W
* IN:输入的数据类型
* OUT:输出的数据类型
* Key:key的数据类型(在Flink里面,String用Tuple表示)
* W:Window的数据类型
*/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
/**
* 当一个window触发计算的时候会调用这个方法
* @param tuple key
* @param context operator的上下文
* @param elements 指定window的所有元素
* @param out 用户输出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,
Collector<String> out) {
System.out.println("处理时间:" + dateFormat.format(context.currentProcessingTime()));
System.out.println("window start time : " + dateFormat.format(context.window().getStart()));
List<String> list = new ArrayList<>();
for (Tuple2<String, Long> ele : elements) {
list.add(ele.toString() + "|" + dateFormat.format(ele.f1));
}
out.collect(list.toString());
System.out.println("window end time : " + dateFormat.format(context.window().getEnd()));
}
}
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
private long currentMaxEventTime=0L;
private long maxOufOfOrderness=10000;//最大乱序时间
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxEventTime - maxOufOfOrderness);
}
@Override
public long extractTimestamp(Tuple2<String, Long> element, long timeStamp) {
Long currentElementTime = element.f1;
currentMaxEventTime = Math.max(currentMaxEventTime,currentElementTime);
System.out.println("event = " + element
+ "|" + dateFormat.format(element.f1) // Event Time
+ "|" + dateFormat.format(currentMaxEventTime) // Max Event Time
+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermark
return currentElementTime;
}
}
}
1.7 多并行度下的WaterMark
一个window可能会接受到多个waterMark,我们以小的为准。
/**
* 需求:3秒一个窗口,进行单词计数
*
* 多并行度下多watermark
*
* 000001,1461756870000
* 000001,1461756883000
* 000001,1461756888000
*
*/
public class WindowWordCountByWaterMark5 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//步骤一:设置时间类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(2);
DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-date"){};
SingleOutputStreamOperator<String> result = dataStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
return new Tuple2<>(fields[0], Long.valueOf(fields[1]));
}
//步骤二:获取数据里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor())
.keyBy(0)
.timeWindow(Time.seconds(3))
// .allowedLateness(Time.seconds(2)) // 允许事件迟到 2 秒
.sideOutputLateData(outputTag) //保留迟到太多的数据
.process(new SumProcessWindowFunction());
//打印结果数据
result.print();
result.getSideOutput(outputTag).map(new MapFunction<Tuple2<String,Long>, String>() {
@Override
public String map(Tuple2<String, Long> stringLongTuple2) throws Exception {
return "迟到数据:"+stringLongTuple2.toString();
}
}).print();
env.execute("WindowWordCountByWaterMark3");
}
/**
* IN, OUT, KEY, W
* IN:输入的数据类型
* OUT:输出的数据类型
* Key:key的数据类型(在Flink里面,String用Tuple表示)
* W:Window的数据类型
*/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
/**
* 当一个window触发计算的时候会调用这个方法
* @param tuple key
* @param context operator的上下文
* @param elements 指定window的所有元素
* @param out 用户输出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,
Collector<String> out) {
System.out.println("处理时间:" + dateFormat.format(context.currentProcessingTime()));
System.out.println("window start time : " + dateFormat.format(context.window().getStart()));
List<String> list = new ArrayList<>();
for (Tuple2<String, Long> ele : elements) {
list.add(ele.toString() + "|" + dateFormat.format(ele.f1));
}
out.collect(list.toString());
System.out.println("window end time : " + dateFormat.format(context.window().getEnd()));
}
}
private static class EventTimeExtractor
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance("HH:mm:ss");
private long currentMaxEventTime=0L;
private long maxOufOfOrderness=10000;//最大乱序时间
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxEventTime - maxOufOfOrderness);
}
@Override
public long extractTimestamp(Tuple2<String, Long> element, long timeStamp) {
Long currentElementTime = element.f1;
currentMaxEventTime = Math.max(currentMaxEventTime,currentElementTime);
long id = Thread.currentThread().getId();
System.out.println("当前线程id="+id+" event = " + element
+ "|" + dateFormat.format(element.f1) // Event Time
+ "|" + dateFormat.format(currentMaxEventTime) // Max Event Time
+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermark
return currentElementTime;
}
}
}
-
ID为58的线程有两个WaterMark:20,38
那么38这个会替代20,所以ID为58的线程的WaterMark是38 -
然后ID为57的线程的WaterMark是33,而ID为58是WaterMark是38,会在里面求一个
小的值
作为 waterMark,就是33,这个时候会触发Window为30-33的窗口,那这个窗口里面就有 (000001,1461756870000)这条数据。