flink 学习笔记 — 时间定义及窗口机制

2019-12-05  本文已影响0人  飞不高的老鸟

flink 回顾

    通过之前的了解,我们知道,flink是一个高吞吐、低延时的流式处理框架。flink 中具有严格的时间定义,采用不同的时间机制,对于其处理延时及处理结果的准确性都有重要的影响。而在不同的时间机制下,我们想要得到准确一致的结果,在 flink 中我们又能怎么处理呢?

flink 时间定义

    事实上,flink 提供了三种不同的时间标的,分别是 Event Time(事件时间)、Ingestion Time(注入时间)、Processing Time(处理时间),下图给出了三种时间对应的发生时机:

flink各时间触发时机图.jpg

    在使用 flink 进行编码时,我们可以这样在代码中指定我们需要采用的时间方式,env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);:

public static void main(String []args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 指定时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<String> dataSource = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = dataSource.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] values = value.split(" ");
                return new Tuple2<String, Integer>(values[0], 1);
            }
        }).keyBy(0)
          .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
              @Override
              public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                  return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
              }
          });

        reduce.print();
        env.execute("test");
    }

    在常用的业务处理时,对于 Ingestion Time 和 Processing Time 的使用相对较少,因此接下来将对 Event Time 在个业务中的使用做详细的介绍。

flink 窗口机制

    Flink 中窗口包括 count window 和 time window,而 time window 又分为 Tumbling Time Window、Sliding Time Window、Session Window 以及 Global Window

各类型窗口图.png
public class TimeAndWindow {

    public static void main(String []args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 创建数据源
        DataStream<String> dataStream = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                String[] values = value.split(" ");
                return new Tuple2<String, Integer>(values[0], 1);
            }
        }).keyBy(0)
          .countWindow(2)   // 使用count window
          .sum(1);

        sum.print();

        env.execute("test");
    }
}
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return window(TumblingProcessingTimeWindows.of(size));
        } else {
            return window(TumblingEventTimeWindows.of(size));
        }
    }

    由下面源码我们可以看到,在使用 ProcessingTime 和 EventTime 时,flink 底层有不同的实现,结下来分别看一下两种实现的区别。可以看到使用 ProcessingTime 时,flink 内部会通过上下文获取当前时间戳进行处理,而在 EventTime 中,需要依赖事件中所携带的时间戳。因此,需要我们先 DataStream.assignTimestampsAndWatermarks(...) 进行时间分配,否则会抛出异常,而 ProcessingTime 无需调用。

//  ProcessingTime 机制 assignWindows 实现
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
    final long now = context.getCurrentProcessingTime();
    long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
    return Collections.singletonList(new TimeWindow(start, start + size));
}


//  EventTime 机制 assignWindows 实现
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
    if (timestamp > Long.MIN_VALUE) {
        // Long.MIN_VALUE is currently assigned when no timestamp is present
        long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
        return Collections.singletonList(new TimeWindow(start, start + size));
    } else {
        throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                "'DataStream.assignTimestampsAndWatermarks(...)'?");
    }
}

    下图描述了 Flink 的窗口机制以及各组件之间是如何相互工作的。由数据源流入的每条数据,会由 WindowAssigner 分配到对应的 Window,当 Window 被触发之后,会交给 Evictor(如果没有设置 Evictor 则跳过),然后处理 UserFunction。


时间窗口底层图.png
/**
     * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
     * elements to time windows based on the element timestamp.
     *
     * @param size The size of the generated windows.
     * @return The time policy.
     */
    public static TumblingEventTimeWindows of(Time size) {
        return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
    }

    /**
     * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
     * elements to time windows based on the element timestamp and offset.
     *
     * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
     * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
     * time windows start at 0:15:00,1:15:00,2:15:00,etc.
     *
     * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
     * such as China which is using UTC+08:00,and you want a time window with size of one day,
     * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
     * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
     *
     * @param size The size of the generated windows.
     * @param offset The offset which window start would be shifted by.
     * @return The time policy.
     */
    public static TumblingEventTimeWindows of(Time size, Time offset) {
        return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
    }
@PublicEvolving
    public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? super T, W> assigner) {
        return new AllWindowedStream<>(this, assigner);
    }
    
@PublicEvolving
    public AllWindowedStream(DataStream<T> input,
            WindowAssigner<? super T, W> windowAssigner) {
        this.input = input.keyBy(new NullByteKeySelector<T>());
        this.windowAssigner = windowAssigner;
        this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
    }

WaterMark 水印机制

    WaterMark 是 flink 为应对 EventTime 的乱序所提出的一种机制,它可以延迟窗口的触发时间,在一定程度上保证延时到来的数据可以进入到相应的时间窗口进行处理,从而保证多次只从同一批数据时所得到的结果是一致的。通常情况下,WaterMark 的生成时在 source 之后,越接近数据源,得到的数据越准确。事实上,WaterMark本身也是一种认为插入的事件,具有相应的时间戳,我们认为,在 WaterMark 到达时,该时间以前的数据已经全部进入相应的窗口内。

watermark.png

    Flink 中,WaterMark 具有两种不同的实现机制,1. AssignerWithPeriodicWatermarks(周期性),2. AssignerWithPunctuatedWatermarks(伴随着每个最新的事件时间)。

// AssignerWithPeriodicWatermarks 水印生成实现
assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
            Long currTime = 0L;
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                return new Watermark(currTime);
            }

            @Override
            public long extractTimestamp(String element, long previousElementTimestamp) {
                String[] values = element.split(" ");
                return Long.parseLong(values[1]);
            }
        })
        
// AssignerWithPunctuatedWatermarks 水印生成实现
assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<String>() {
                    @Nullable
                    @Override
                    public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
                        return null;
                    }

                    @Override
                    public long extractTimestamp(String element, long previousElementTimestamp) {
                        return 0;
                    }
                })
使用场景:
WarterMark 源码
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {

    /**
     * Returns the current watermark. This method is periodically called by the
     * system to retrieve the current watermark. The method may return {@code null} to
     * indicate that no new Watermark is available.
     *
     * <p>The returned watermark will be emitted only if it is non-null and its timestamp
     * is larger than that of the previously emitted watermark (to preserve the contract of
     * ascending watermarks). If the current watermark is still
     * identical to the previous one, no progress in event time has happened since
     * the previous call to this method. If a null value is returned, or the timestamp
     * of the returned watermark is smaller than that of the last emitted one, then no
     * new watermark will be generated.
     *
     * <p>The interval in which this method is called and Watermarks are generated
     * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
     *
     * @see org.apache.flink.streaming.api.watermark.Watermark
     * @see ExecutionConfig#getAutoWatermarkInterval()
     *
     * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
     */
    @Nullable
    Watermark getCurrentWatermark();
}

public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {

    /**
     * Asks this implementation if it wants to emit a watermark. This method is called right after
     * the {@link #extractTimestamp(Object, long)} method.
     *
     * <p>The returned watermark will be emitted only if it is non-null and its timestamp
     * is larger than that of the previously emitted watermark (to preserve the contract of
     * ascending watermarks). If a null value is returned, or the timestamp of the returned
     * watermark is smaller than that of the last emitted one, then no new watermark will
     * be generated.
     *
     * <p>For an example how to use this method, see the documentation of
     * {@link AssignerWithPunctuatedWatermarks this class}.
     *
     * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
     */
    @Nullable
    Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}

public interface TimestampAssigner<T> extends Function {

    /**
     * Assigns a timestamp to an element, in milliseconds since the Epoch.
     *
     * <p>The method is passed the previously assigned timestamp of the element.
     * That previous timestamp may have been assigned from a previous assigner,
     * by ingestion time. If the element did not carry a timestamp before, this value is
     * {@code Long.MIN_VALUE}.
     *
     * @param element The element that the timestamp will be assigned to.
     * @param previousElementTimestamp The previous internal timestamp of the element,
     *                                 or a negative value, if no timestamp has been assigned yet.
     * @return The new timestamp.
     */
    long extractTimestamp(T element, long previousElementTimestamp);

    无论是 AssignerWithPeriodicWatermarks,还是 AssignerWithPunctuatedWatermarks,都继承了 TimestampAssigne 来获取需要的时间戳。

总结

    本文简要介绍了 Flink 中的时间机制以及窗口机制,事实上,这两种机制是完全结偶。而 WarterMark 的使用可以最大限度的解决数据流的乱序问题,但同时造成了一定程度的时延。

上一篇 下一篇

猜你喜欢

热点阅读