flink学习之十二-eventTime中的watermark

2019-03-21  本文已影响0人  AlanKim

在flink中使用event time时,一般需要自定义Timestamp Extractors / Watermark Emitters,实现AssignerWithPeriodicWatermarks 或者 AssignerWithPunctuatedWatermarks 这两个接口,取决于你的业务中是需要一个周期性的watermark,还是一个基于stream中某个特殊元素或者元素中某个特殊字段值生成一个watermark。

而flink中提供了一些已经实现好的Timestamp Extractors/Watermark Emiters,下面详细学习下:

AscendingTimestampExtractor-递增时间戳的分配器

老规矩,先看代码

package org.apache.flink.streaming.api.functions.timestamps;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;

/**
 * A timestamp assigner and watermark generator for streams where timestamps are monotonously
 * ascending. In this case, the local watermarks for the streams are easy to generate, because
 * they strictly follow the timestamps.
 *
 * @param <T> The type of the elements that this function can extract timestamps from
 */
@PublicEvolving
public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

    private static final long serialVersionUID = 1L;

    /** The current timestamp. */
    private long currentTimestamp = Long.MIN_VALUE;

    /** Handler that is called when timestamp monotony is violated. */
    private MonotonyViolationHandler violationHandler = new LoggingHandler();


    /**
     * Extracts the timestamp from the given element. The timestamp must be monotonically increasing.
     *
     * @param element The element that the timestamp is extracted from.
     * @return The new timestamp.
     */
    public abstract long extractAscendingTimestamp(T element);

    /**
     * Sets the handler for violations to the ascending timestamp order.
     *
     * @param handler The violation handler to use.
     * @return This extractor.
     */
    public AscendingTimestampExtractor<T> withViolationHandler(MonotonyViolationHandler handler) {
        this.violationHandler = requireNonNull(handler);
        return this;
    }

    // ------------------------------------------------------------------------

    @Override
    public final long extractTimestamp(T element, long elementPrevTimestamp) {
        final long newTimestamp = extractAscendingTimestamp(element);
        if (newTimestamp >= this.currentTimestamp) {
            this.currentTimestamp = newTimestamp;
            return newTimestamp;
        } else {
            violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
            return newTimestamp;
        }
    }

    @Override
    public final Watermark getCurrentWatermark() {
        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
    }

    // ------------------------------------------------------------------------
    //  Handling violations of monotonous timestamps
    // ------------------------------------------------------------------------

    /**
     * Interface for handlers that handle violations of the monotonous ascending timestamps
     * property.
     */
    public interface MonotonyViolationHandler extends java.io.Serializable {

        /**
         * Called when the property of monotonously ascending timestamps is violated, i.e.,
         * when {@code elementTimestamp < lastTimestamp}.
         *
         * @param elementTimestamp The timestamp of the current element.
         * @param lastTimestamp The last timestamp.
         */
        void handleViolation(long elementTimestamp, long lastTimestamp);
    }

    /**
     * Handler that does nothing when timestamp monotony is violated.
     */
    public static final class IgnoringHandler implements MonotonyViolationHandler {
        private static final long serialVersionUID = 1L;

        @Override
        public void handleViolation(long elementTimestamp, long lastTimestamp) {}
    }

    /**
     * Handler that fails the program when timestamp monotony is violated.
     */
    public static final class FailingHandler implements MonotonyViolationHandler {
        private static final long serialVersionUID = 1L;

        @Override
        public void handleViolation(long elementTimestamp, long lastTimestamp) {
            throw new RuntimeException("Ascending timestamps condition violated. Element timestamp "
                    + elementTimestamp + " is smaller than last timestamp " + lastTimestamp);
        }
    }

    /**
     * Handler that only logs violations of timestamp monotony, on WARN log level.
     */
    public static final class LoggingHandler implements MonotonyViolationHandler {
        private static final long serialVersionUID = 1L;

        private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class);

        @Override
        public void handleViolation(long elementTimestamp, long lastTimestamp) {
            LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp);
        }
    }
}

1、是抽象类,实现AssignerWithPeriodicWatermarks接口,是周期性生成watermark。同样的,周期间隔时间一样通过ExecutionConfig设置。

2、在source中,每个元素的timestamp必须是递增产生的;当然,到达flink中的顺序可能错乱

3、已经实现了AssignerWithPeriodicWatermarks中定义的extractTimestamp、getCurrentWatermark方法,其中extractTimestamp指定了新的timestamp,数据来源于自定义的抽象方法extractAscendingTimestamp;而getCurrentWatermark方法则使用了currentTimestamp - 1的值

4、使用此类需要实现extractAscendingTimestamp,用于指定一个递增的时间戳。

5、至于MonotonyViolationHandler,其三个子类或者ignore、或者fail抛出异常、或者只是记录log。只有抛出异常才会打断整个过程。

使用方式如下(来源于):

DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {

        @Override
        public long extractAscendingTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});

BoundedOutOfOrdernessTimestampExtractor--允许固定时间延迟的timestamp分配器

这个类实例构造函数需要传入一个时间,比如Time.second(30),意思是30秒内到达的数据,可以允许在timewindow内处理。

先看代码

package org.apache.flink.streaming.api.functions.timestamps;

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with
 * the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can
 * help reduce the number of elements that are ignored due to lateness when computing the final result for a
 * given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time
 * after the watermark that signals that the system event-time has advanced past their (event-time) timestamp.
 * */
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

    private static final long serialVersionUID = 1L;

    /** The current maximum timestamp seen so far. */
    private long currentMaxTimestamp;

    /** The timestamp of the last emitted watermark. */
    private long lastEmittedWatermark = Long.MIN_VALUE;

    /**
     * The (fixed) interval between the maximum seen timestamp seen in the records
     * and that of the watermark to be emitted.
     */
    private final long maxOutOfOrderness;

    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }

    public long getMaxOutOfOrdernessInMillis() {
        return maxOutOfOrderness;
    }

    /**
     * Extracts the timestamp from the given element.
     *
     * @param element The element that the timestamp is extracted from.
     * @return The new timestamp.
     */
    public abstract long extractTimestamp(T element);

    @Override
    public final Watermark getCurrentWatermark() {
        // this guarantees that the watermark never goes backwards.
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
}

1、可以看到,依然是实现AssignerWithPeriodicWatermarks,也就是依然是周期性

2、有一个带参数的构造函数,其参数为Time maxOutOfOrderness,意思是最大可以延迟多久,超过这个时间的则被ignore。

具体使用如下:

DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {

        @Override
        public long extractTimestamp(MyEvent element) {
            return element.getCreationTime();
        }
});

参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamp_extractors.html

这里的Assigners allowing a fixed amount of lateness,amount表示时间,而不是具体的数目

上一篇 下一篇

猜你喜欢

热点阅读