算法小白菜spark

Flink--WaterMark理解和实践

2019-08-06  本文已影响16人  李小李的路

Watermark作用

Window的划分

window划分 第一个 第二个 第三个
3s [00:00:00~00:00:03) [00:00:03~00:00:06) [00:00:06~00:00:09)
5s [00:00:00~00:00:05) [00:00:05~00:00:10) [00:00:10~00:00:15)
10s [00:00:00~00:00:10) [00:00:10~00:00:20) [00:00:20~00:00:30)
1min [00:00:00~00:01:00) [00:01:00~00:02:00) [00:02:00~00:03:00)

Watermark分配方式

Watermark默认更新时间

// --------------------------------------------------------------------------------------------
    //  Time characteristic
    // --------------------------------------------------------------------------------------------

    /**
     * Sets the time characteristic for all streams create from this environment, e.g., processing
     * time, event time, or ingestion time.
     *
     * <p>If you set the characteristic to IngestionTime of EventTime this will set a default
     * watermark update interval of 200 ms. If this is not applicable for your application
     * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
     *
     * @param characteristic The time characteristic.
     */
    @PublicEvolving
    public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
        this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
        if (characteristic == TimeCharacteristic.ProcessingTime) {
            getConfig().setAutoWatermarkInterval(0);
        } else {
            getConfig().setAutoWatermarkInterval(200);
        }
    }

Periodic Watermarks跟踪

package github.yahuili1128.watermark;

import github.yahuili1128.pojo.MockUpModel;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.TreeSet;

import static github.yahuili1128.connector.SourceKafka010.getMockUpkafka010;

/**
 * @Description : 从kafka中读取数据,练习watermark
 * @Author : LiYahui
 * @Date : 2019-08-06 11:45
 * @Version : V1.0
 */
public class PeriodicWatermarkTest {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        SingleOutputStreamOperator<MockUpModel> mockUpkafka010 = getMockUpkafka010(env).name("kafka source");
        SingleOutputStreamOperator<String> result = mockUpkafka010.filter(line -> line.gender.equals("male"))
                .assignTimestampsAndWatermarks(new GetWateramrk()).keyBy(line -> line.gender)
                .window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowTest()).name("watermark print");

        result.print();

        env.execute(PeriodicWatermarkTest.class.getSimpleName());
    }

    public static class WindowTest implements WindowFunction<MockUpModel, String, String, TimeWindow> {

        @Override
        public void apply(String key, TimeWindow window, Iterable<MockUpModel> input, Collector<String> out)
                throws Exception {
            TreeSet<Long> set = new TreeSet<>();
            //          元素个数
            int size = Iterables.size(input);
            Iterator<MockUpModel> eles = input.iterator();
            while (eles.hasNext()) {
                set.add(eles.next().timestamp);
            }
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            //(code,窗口内元素个数,窗口内最早元素的时间,窗口内最晚元素的时间,窗口自身开始时间,窗口自身结束时间)
            String first = sdf.format(set.first());
            String last = sdf.format(set.last());
            String start = sdf.format(window.getStart());
            String end = sdf.format(window.getEnd());
            // 调试使用
            out.collect("event.key:" + key + ",window中元素个数:" + size + ",window第一个元素时间戳:" + first + ",window最后一个元素时间戳:"
                    + last + ",window开始时间戳:" + start + ",window结束时间戳:" + end + ",窗口内所有的时间戳:" + set.toString());

        }
    }


    public static class GetWateramrk implements AssignerWithPeriodicWatermarks<MockUpModel> {
        //      定义最大延迟 2s
        private final long maxOutOfOrderness = 5000L;
        private long currentMaxTimestamp;
        private Watermark watermark;

        //      将时间戳信息格式化,调试学习
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            watermark = new Watermark(this.currentMaxTimestamp - this.maxOutOfOrderness);
            return watermark;
        }

        @Override
        public long extractTimestamp(MockUpModel element, long previousElementTimestamp) {
            //      获取event中的时间戳
            long timestamp = element.getTimestamp();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            //          将所有的时间信息打印
            System.out.println("--->>> event.key:" + element.gender + " | event中timestamp:" + timestamp + "| " + sdf
                    .format(timestamp) + "| currentMaxTimestamp:" + currentMaxTimestamp + "| " + sdf
                    .format(currentMaxTimestamp) + "| watermark" + watermark.toString());
            //          返回event中的时间戳
            return timestamp;
        }
    }

}

--->>> event.key:male | event中timestamp:1565082033747| 2019-08-06 17:00:33.747| currentMaxTimestamp:1565082033747| 2019-08-06 17:00:33.747| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082036758| 2019-08-06 17:00:36.758| currentMaxTimestamp:1565082036758| 2019-08-06 17:00:36.758| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082038778| 2019-08-06 17:00:38.778| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082037791| 2019-08-06 17:00:37.791| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082038803| 2019-08-06 17:00:38.803| currentMaxTimestamp:1565082038803| 2019-08-06 17:00:38.803| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082039815| 2019-08-06 17:00:39.815| currentMaxTimestamp:1565082039815| 2019-08-06 17:00:39.815| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082041839| 2019-08-06 17:00:41.839| currentMaxTimestamp:1565082041839| 2019-08-06 17:00:41.839| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082044850| 2019-08-06 17:00:44.850| currentMaxTimestamp:1565082044850| 2019-08-06 17:00:44.850| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082046869| 2019-08-06 17:00:46.869| currentMaxTimestamp:1565082046869| 2019-08-06 17:00:46.869| watermarkWatermark @ 1565082039850
8> event.key:male,window中元素个数:6,window第一个元素时间戳:2019-08-06 17:00:33.747,window最后一个元素时间戳:2019-08-06 17:00:39.815,window开始时间戳:2019-08-06 17:00:30.000,window结束时间戳:2019-08-06 17:00:40.000,窗口内所有的时间戳:[1565082033747, 1565082036758, 1565082037791, 1565082038778, 1565082038803, 1565082039815]
--->>> event.key:male | event中timestamp:1565082047880| 2019-08-06 17:00:47.880| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082041869
--->>> event.key:male | event中timestamp:1565082046890| 2019-08-06 17:00:46.890| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082042880
--->>> event.key:male | event中timestamp:1565082049900| 2019-08-06 17:00:49.900| currentMaxTimestamp:1565082049900| 2019-08-06 17:00:49.900| watermarkWatermark @ 1565082042880
--->>> event.key:male | event中timestamp:1565082051921| 2019-08-06 17:00:51.921| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082044900
--->>> event.key:male | event中timestamp:1565082050934| 2019-08-06 17:00:50.934| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082046921
--->>> event.key:male | event中timestamp:1565082051945| 2019-08-06 17:00:51.945| currentMaxTimestamp:1565082051945| 2019-08-06 17:00:51.945| watermarkWatermark @ 1565082046921
--->>> event.key:male | event中timestamp:1565082052955| 2019-08-06 17:00:52.955| currentMaxTimestamp:1565082052955| 2019-08-06 17:00:52.955| watermarkWatermark @ 1565082046945
--->>> event.key:male | event中timestamp:1565082055965| 2019-08-06 17:00:55.965| currentMaxTimestamp:1565082055965| 2019-08-06 17:00:55.965| watermarkWatermark @ 1565082047955
8> event.key:male,window中元素个数:6,window第一个元素时间戳:2019-08-06 17:00:41.839,window最后一个元素时间戳:2019-08-06 17:00:49.900,window开始时间戳:2019-08-06 17:00:40.000,window结束时间戳:2019-08-06 17:00:50.000,窗口内所有的时间戳:[1565082041839, 1565082044850, 1565082046869, 1565082046890, 1565082047880, 1565082049900]
--->>> event.key:male | event中timestamp:1565082056983| 2019-08-06 17:00:56.983| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082050965
--->>> event.key:male | event中timestamp:1565082055993| 2019-08-06 17:00:55.993| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082051983
--->>> event.key:male | event中timestamp:1565082059005| 2019-08-06 17:00:59.005| currentMaxTimestamp:1565082059005| 2019-08-06 17:00:59.005| watermarkWatermark @ 1565082051983
--->>> event.key:male | event中timestamp:1565082061028| 2019-08-06 17:01:01.028| currentMaxTimestamp:1565082061028| 2019-08-06 17:01:01.028| watermarkWatermark @ 1565082054005
--->>> event.key:male | event中timestamp:1565082062036| 2019-08-06 17:01:02.036| currentMaxTimestamp:1565082062036| 2019-08-06 17:01:02.036| watermarkWatermark @ 1565082056028

结论


上一篇下一篇

猜你喜欢

热点阅读