聊聊flink的InternalTimeServiceManag

2019-01-18  本文已影响24人  go4it

本文主要研究一下flink的InternalTimeServiceManager

InternalTimeServiceManager

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java

@Internal
public class InternalTimeServiceManager<K> {

    @VisibleForTesting
    static final String TIMER_STATE_PREFIX = "_timer_state";
    @VisibleForTesting
    static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_";
    @VisibleForTesting
    static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_";

    private final KeyGroupRange localKeyGroupRange;
    private final KeyContext keyContext;

    private final PriorityQueueSetFactory priorityQueueSetFactory;
    private final ProcessingTimeService processingTimeService;

    private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;

    private final boolean useLegacySynchronousSnapshots;

    InternalTimeServiceManager(
        KeyGroupRange localKeyGroupRange,
        KeyContext keyContext,
        PriorityQueueSetFactory priorityQueueSetFactory,
        ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) {

        this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange);
        this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory);
        this.keyContext = Preconditions.checkNotNull(keyContext);
        this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
        this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots;

        this.timerServices = new HashMap<>();
    }

    @SuppressWarnings("unchecked")
    public <N> InternalTimerService<N> getInternalTimerService(
        String name,
        TimerSerializer<K, N> timerSerializer,
        Triggerable<K, N> triggerable) {

        InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer);

        timerService.startTimerService(
            timerSerializer.getKeySerializer(),
            timerSerializer.getNamespaceSerializer(),
            triggerable);

        return timerService;
    }

    @SuppressWarnings("unchecked")
    <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {
        InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name);
        if (timerService == null) {

            timerService = new InternalTimerServiceImpl<>(
                localKeyGroupRange,
                keyContext,
                processingTimeService,
                createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),
                createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer));

            timerServices.put(name, timerService);
        }
        return timerService;
    }

    Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() {
        return Collections.unmodifiableMap(timerServices);
    }

    private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue(
        String name,
        TimerSerializer<K, N> timerSerializer) {
        return priorityQueueSetFactory.create(
            name,
            timerSerializer);
    }

    public void advanceWatermark(Watermark watermark) throws Exception {
        for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
            service.advanceWatermark(watermark.getTimestamp());
        }
    }

    //////////////////              Fault Tolerance Methods             ///////////////////

    public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException {
        Preconditions.checkState(useLegacySynchronousSnapshots);
        InternalTimerServiceSerializationProxy<K> serializationProxy =
            new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx);

        serializationProxy.write(stream);
    }

    public void restoreStateForKeyGroup(
            InputStream stream,
            int keyGroupIdx,
            ClassLoader userCodeClassLoader) throws IOException {

        InternalTimerServiceSerializationProxy<K> serializationProxy =
            new InternalTimerServiceSerializationProxy<>(
                this,
                userCodeClassLoader,
                keyGroupIdx);

        serializationProxy.read(stream);
    }

    ////////////////////            Methods used ONLY IN TESTS              ////////////////////

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        int count = 0;
        for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
            count += timerService.numProcessingTimeTimers();
        }
        return count;
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        int count = 0;
        for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) {
            count += timerService.numEventTimeTimers();
        }
        return count;
    }
}

PriorityQueueSetFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityQueueSetFactory.java

public interface PriorityQueueSetFactory {

    @Nonnull
    <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create(
        @Nonnull String stateName,
        @Nonnull TypeSerializer<T> byteOrderedElementSerializer);
}

HeapPriorityQueueElement

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.java

@Internal
public interface HeapPriorityQueueElement {

    /**
     * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any
     * {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when
     * elements are removed from a {@link HeapPriorityQueue}.
     */
    int NOT_CONTAINED = Integer.MIN_VALUE;

    /**
     * Returns the current index of this object in the internal array of {@link HeapPriorityQueue}.
     */
    int getInternalIndex();

    /**
     * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning
     * {@link HeapPriorityQueue}.
     *
     * @param newIndex the new index in the timer heap.
     */
    void setInternalIndex(int newIndex);
}

PriorityComparable

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityComparable.java

public interface PriorityComparable<T> {

    int comparePriorityTo(@Nonnull T other);
}

Keyed

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Keyed.java

public interface Keyed<K> {

    K getKey();
}

InternalTimer

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimer.java

@Internal
public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> {

    /** Function to extract the key from a {@link InternalTimer}. */
    KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey;

    /** Function to compare instances of {@link InternalTimer}. */
    PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR =
        (left, right) -> Long.compare(left.getTimestamp(), right.getTimestamp());
    /**
     * Returns the timestamp of the timer. This value determines the point in time when the timer will fire.
     */
    long getTimestamp();

    /**
     * Returns the key that is bound to this timer.
     */
    @Nonnull
    @Override
    K getKey();

    /**
     * Returns the namespace that is bound to this timer.
     */
    @Nonnull
    N getNamespace();
}

TimerHeapInternalTimer

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java

@Internal
public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement {

    /** The key for which the timer is scoped. */
    @Nonnull
    private final K key;

    /** The namespace for which the timer is scoped. */
    @Nonnull
    private final N namespace;

    /** The expiration timestamp. */
    private final long timestamp;

    private transient int timerHeapIndex;

    TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) {
        this.timestamp = timestamp;
        this.key = key;
        this.namespace = namespace;
        this.timerHeapIndex = NOT_CONTAINED;
    }

    @Override
    public long getTimestamp() {
        return timestamp;
    }

    @Nonnull
    @Override
    public K getKey() {
        return key;
    }

    @Nonnull
    @Override
    public N getNamespace() {
        return namespace;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }

        if (o instanceof InternalTimer) {
            InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
            return timestamp == timer.getTimestamp()
                && key.equals(timer.getKey())
                && namespace.equals(timer.getNamespace());
        }

        return false;
    }

    @Override
    public int getInternalIndex() {
        return timerHeapIndex;
    }

    @Override
    public void setInternalIndex(int newIndex) {
        this.timerHeapIndex = newIndex;
    }

    void removedFromTimerQueue() {
        setInternalIndex(NOT_CONTAINED);
    }

    @Override
    public int hashCode() {
        int result = (int) (timestamp ^ (timestamp >>> 32));
        result = 31 * result + key.hashCode();
        result = 31 * result + namespace.hashCode();
        return result;
    }

    @Override
    public String toString() {
        return "Timer{" +
                "timestamp=" + timestamp +
                ", key=" + key +
                ", namespace=" + namespace +
                '}';
    }

    @Override
    public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) {
        return Long.compare(timestamp, other.getTimestamp());
    }
}

HeapPriorityQueueSetFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java

public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {

    @Nonnull
    private final KeyGroupRange keyGroupRange;

    @Nonnegative
    private final int totalKeyGroups;

    @Nonnegative
    private final int minimumCapacity;

    public HeapPriorityQueueSetFactory(
        @Nonnull KeyGroupRange keyGroupRange,
        @Nonnegative int totalKeyGroups,
        @Nonnegative int minimumCapacity) {

        this.keyGroupRange = keyGroupRange;
        this.totalKeyGroups = totalKeyGroups;
        this.minimumCapacity = minimumCapacity;
    }

    @Nonnull
    @Override
    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create(
        @Nonnull String stateName,
        @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {

        return new HeapPriorityQueueSet<>(
            PriorityComparator.forPriorityComparableObjects(),
            KeyExtractorFunction.forKeyedObjects(),
            minimumCapacity,
            keyGroupRange,
            totalKeyGroups);
    }
}

小结

doc

上一篇下一篇

猜你喜欢

热点阅读