Flink

Flink-1.12(六)Flink时间语义及Watermark

2021-06-25  本文已影响0人  _大叔_

时间语义

flink 有三种时间:EventTime,表示数据最初的触发时间;IngestionTime,数据进入Flink的时间,是DataSource拿到数据的时间;ProcessingTime,执行操作算子的本地系统时间,与机器相关。

flink1.12 中默认时间语义是 EventTime,在实际处理数据,大多也都是以 EventTime 为主。因为数据可能受于网络的影响,或其他因素导致乱序数据的产生。

Watermark

那对于乱序数据我们就可以使用 Watermark,我们来举个例子说明 flink 中的 Watermark 是什么:比如定点9点上车,但是往往有人9.01才来,那 Watermark 的做法就是把自己的时间调慢,也就是8.59分,9.01的人来了 对于我来说还是9点。如果有人9.02来了,那其实可以配合 window 的延迟,我先输出数据开车,你来弯道超车,然后我重新计算输出新数据,但非有人,9.03来呢,那我不管你了,我开车上高速,你可以坐下一辆车(侧输出流)。

Watermark 的时间不宜设置太大,因为拿到数据时间可能是准确的,但是拿到数据就会很慢。相当于9点发车,但我想等人到齐(数据),我设置到了 8.30,我需要在等 30分钟才能发车,此时我到站 比别人还晚30分钟。

以下演示 Watermark 时间乱序处理,和配合window的函数处理

Watermark 在分区中会计算最小事件时钟,保证下游数据收到每个事件时钟,并根据自己的时钟得出是否计算结果并关闭窗口。Watermark 是以广播的形式把事件时钟传递给每个下游。

watermark 的特性主要有以下几点:

代码演示
public class EventData{

    private Integer id;
    private Long eventTime;
    private String data;
    private Integer num;

    public EventData(){

    };

    public Integer getNum() {
        return num;
    }

    public void setNum(Integer num) {
        this.num = num;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public Long getEventTime() {
        return eventTime;
    }

    public void setEventTime(Long eventTime) {
        this.eventTime = eventTime;
    }

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }

    @Override
    public String toString() {
        return "EventData{" +
                "id=" + id +
                ", eventTime=" + eventTime +
                ", data='" + data + '\'' +
                ", num=" + num +
                '}';
    }
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
        // 数据转换
        DataStream<EventData> stream = dataStream.map(new MapFunction<String, EventData>() {
            @Override
            public EventData map(String value) throws Exception {
                String[] strs = value.split(",");
                EventData eventData = new EventData();
                eventData.setId(Integer.valueOf(strs[0]));
                eventData.setEventTime(Long.valueOf(strs[1]));
                eventData.setData(strs[2]);
                eventData.setNum(Integer.valueOf(strs[3]));
                return eventData;

            }
        }).assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor<EventData>(Time.seconds(5)) {
                //提取时间戳
                @Override
                public long extractTimestamp(EventData element) {
                    return element.getEventTime() * 1000L;
                }
            }
        );
        stream.print("WM: ");
        // 基于事件时间的开窗聚合,统计15秒内数据的最小ID值
        stream.keyBy(new KeySelector<EventData, Object>() {
                    @Override
                    public Object getKey(EventData value) throws Exception {
                        return value.getData();
                    }
                })
                .timeWindow(Time.seconds(15))
//                .window(TumblingEventTimeWindows.of(Time.seconds(15)))
                .sum("num")
                .print("result: ");
        env.execute("test");
    }

setAutoWatermarkInterval 是设置 Watermark 的生成时间,默认是 0,也就是来一个数据我对这个数据生成一个 Watermark时钟,用于去比较窗口函数的时间来触发计算。可以手动设置,在庞大的数据量,每生成一个 Watermark 有些费性能。以下为隔100ms生成一次。

env.getConfig().setAutoWatermarkInterval(100);

测试数据如下:

6,1623051400,test data,1
6,1623051401,test data,1
6,1623051402,test data,1
6,1623051405,test data,3
6,1623051406,test data,3
6,1623051409,test data,3
6,1623051410,test data,5

你会发现当你的 timestamp 为 1623051410,会触发窗口计算,但输出的num=3,因为 Watermark 创建窗口是会自动创建,会根据你的第一个数据的 timestamp 以及 timeWindow 窗口的值计算窗口的 startTime,计算方式

# startTime
timestamp - (timestamp - offset + windowSize) % windowSize;
# endTime
startTime + windowSize

其中默认情况 offset = 0,根据以上计算得出如下

startTime     timestamp
1623051390000 1623051400000
1623051390000 1623051401000
1623051390000 1623051402000
1623051405000 1623051405000
1623051405000 1623051406000
1623051405000 1623051409000
1623051405000 1623051410000

会看到 400000(timestamp) 创建的 startTime 是 390000,405000(timestamp) 创建的 startTime 是 405000,他们被分为了两个窗口。而他们的 405000-390000=15000=15s,刚好就是我们的 windowSize,也就是说 405000 的触发点在 420000 ,但我们还给 Watermark 延迟了 5s,也就是说正确的关闭第二个窗口的 timestamp = 1623051425000,得出结论

Watermark1=[390,405)  Watermark2=[405,420)

以上结论并行度为1

上一篇 下一篇

猜你喜欢

热点阅读