玩转大数据JavaFlink学习指南

Flink 源码之 Window Slice

2022-05-25  本文已影响0人  AlienPaul

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

前言

Window slice是Flink对SQL window聚合的一种优化方式。我们回忆下window的三种窗口,其中如果使用hopping window或cumulative window,不难发现window和window之间是有重叠的。如何避免重复计算窗口间重叠的部分很明显是优化的一个方向。为了最大化复用重叠部分的数据,Flink引入了切片(slice)的概念。一个window可以拆分成1个或多个slice的组合(必须恰好组合成window,不能存在slice跨过window边界的情况),对于整个window的计算粒度可以分解为对slice的计算。Flink将这些slice缓存,从而实现了slice复用,避免了window重叠部分重复计算。以上是window slice优化的思路。对于滚动窗口而言,一个窗口就是一个切片(窗口间无重叠,切片之间不共享);而对滑动/累积窗口而言,一个窗口可能包含多个切片,一个切片也可能位于多个窗口中(切片之间会共享)。

接下来我们开始分析整个优化过程的核心逻辑。

SqlWindowTableFunction

Flink支持window table valued function(TVF)。TVF对应的window计算function的基类为SqlWindowTableFunction它有3个子类:

分别对应了累计窗口,滑动窗口和滚动窗口TVF。

这些方法在SQL优化规则中(ProjectWindowTableFunctionTransposeRule和)生成。LogicalProject转换为LogicalTableFunctionScan。SQL优化的过程不是本篇重点,后续单独开篇分析。

SliceAssigner

SliceAssigner用途是将元素指定给某个slice(根据元素的时间,判断位于哪个slice中)。StreamExecWindowAggregateBase根据window的执行计划,创建对应的assigner。

在Slice优化过程中,通常使用slice/window的结束时间戳来表示slice/window。

SliceAssigner有两个子接口:

分别代表了slice共享和slice不共享的assigner。

SliceAssigner继承图

接下来分析它们的源代码:

@Internal
public interface SliceAssigner extends Serializable {

    /**
     * Returns the end timestamp of a slice that the given element should belong.
     *
     * @param element the element to which slice should belong to.
     * @param clock the service to get current processing time.
     */
    // 获取元素所属的slice,返回这个slice的结束时间戳
    long assignSliceEnd(RowData element, ClockService clock);

    /**
     * Returns the last window which the slice belongs to. The window and and slices are both
     * identified by the end timestamp.
     */
    // 获取slice所属的window,返回这个window的结束时间戳
    // 如果slice属于多个window(共享slice),则返回这个slice所属的最后一个window
    long getLastWindowEnd(long sliceEnd);

    /** Returns the corresponding window start timestamp of the given window end timestamp. */
    // 给出window的结束时间,返回这个window的开始时间
    long getWindowStart(long windowEnd);

    /**
     * Returns an iterator of slices to expire when the given window is emitted. The window and
     * slices are both identified by the end timestamp.
     *
     * @param windowEnd the end timestamp of window emitted.
     */
    // 当window数据发送到下游之后,返回需要过期处理的slice的iterator(有些共享的slice再也用不到了)
    // window和slice都用结束时间戳来表示
    Iterable<Long> expiredSlices(long windowEnd);

    /**
     * Returns the interval of slice ends, i.e. the step size to advance of the slice end when a new
     * slice assigned.
     */
    // 返回slice之间的时间间隔。比如说分配下一个slice的时候,新的slice时间需要前进多少
    long getSliceEndInterval();

    /**
     * Returns {@code true} if elements are assigned to windows based on event time, {@code false}
     * based on processing time.
     */
    // 返回时候使用 event time。如果返回false说明使用的是processing time
    boolean isEventTime();
}

SliceSharedAssigner

SliceSharedAssigner在window之间共享slice。一个window被分割为多个slice。当发送window数据的时候,需要合并slice,形成window计算结果。

源代码如下:

@Internal
public interface SliceSharedAssigner extends SliceAssigner {

    /**
     * Determines which slices (if any) should be merged.
     *
     * @param sliceEnd the triggered slice, identified by end timestamp
     * @param callback a callback that can be invoked to signal which slices should be merged.
     */
    // 合并slice。sliceEnd为触发合并操作的slice,callback为合并slice的操作
    void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception;

    /**
     * Returns the optional end timestamp of next window which should be triggered. Empty if no
     * following window to trigger for now.
     *
     * <p>The purpose of this method is avoid register too many timers for each hopping and
     * cumulative slice, e.g. HOP(1day, 10s) needs register 8640 timers for every slice. In order to
     * improve this, we only register one timer for the next window. For hopping windows we don't
     * register next window if current window is empty (i.e. no records in current window). That
     * means we will have one more unnecessary window triggered for hopping windows if no elements
     * arrives for a key for a long time. We will skip to emit window result for the triggered empty
     * window, see {@link SliceSharedWindowAggProcessor#fireWindow(Long)}.
     *
     * @param windowEnd the current triggered window, identified by end timestamp
     * @param isWindowEmpty a supplier that can be invoked to get whether the triggered window is
     *     empty (i.e. no records in the window).
     */
    // 返回下一个需要触发计算的window
    // windowEnd为当前触发计算的window的结束时间戳
    // isWindowEmpty为触发计算的window内是否有数据,是一个supplier类型,在需要计算的时候再返回结果
    Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty);

    // ------------------------------------------------------------------------

    /**
     * Callback to be used in {@link #mergeSlices(long, MergeCallback)} for specifying which slices
     * should be merged.
     */
    // 合并slice的回调函数,确定哪些slice需要合并,执行合并操作
    interface MergeCallback {

        /**
         * Specifies that states of the given slices should be merged into the result slice.
         *
         * @param mergeResult The resulting merged slice, {@code null} if it represents a non-state
         *     namespace.
         * @param toBeMerged The list of slices that should be merged into one slice.
         */
        // mergeResult 合并后的slice(结束时间戳表示)
        // toBeMerged 需要合并的slice(同样是结束时间戳表示)
        void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception;
    }
}

SliceUnsharedAssigner

SliceUnsharedAssigner表示分配的slice不需要在多个window之间共享,因此1个window只会被分割成1个slice。发送window数据的时候不需要合并多个slice。该接口没有任何专属的方法。代码如下所示:

@Internal
public interface SliceUnsharedAssigner extends SliceAssigner {}

AbstractSliceAssigner

这是具备SlicAssigner基础功能的抽象类。所有SliceAssigner的实现类都继承自该类。我们查看下基本功能有哪些。

private abstract static class AbstractSliceAssigner implements SliceAssigner {
    private static final long serialVersionUID = 1L;

    // rowtime字段位于RowData的第几列
    protected final int rowtimeIndex;
    // 是否使用event time
    protected final boolean isEventTime;
    // 时区ID
    protected final ZoneId shiftTimeZone;

    protected AbstractSliceAssigner(int rowtimeIndex, ZoneId shiftTimeZone) {
        this.rowtimeIndex = rowtimeIndex;
        this.shiftTimeZone = shiftTimeZone;
        this.isEventTime = rowtimeIndex >= 0;
    }

    // 引出一个新的抽象方法。传入的参数为数据对应的timestamp
    // 详细请见下面的方法分析
    public abstract long assignSliceEnd(long timestamp);

    @Override
    public final long assignSliceEnd(RowData element, ClockService clock) {
        // 这个方法提取出element对应的时间
        final long timestamp;
        if (rowtimeIndex >= 0) {
            // 如果是event time模式
            // 提取出这行数据对应的rowtime,转换时间为UTC
            // Precision for row timestamp is always 3
            TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
            timestamp = toUtcTimestampMills(rowTime.getMillisecond(), shiftTimeZone);
        } else {
            // 如果是processing time 模式,转换当前时间为UTC
            // in processing time mode
            timestamp = toUtcTimestampMills(clock.currentProcessingTime(), shiftTimeZone);
        }
        // 根据提取出的时间,分配slice
        return assignSliceEnd(timestamp);
    }

    @Override
    public final boolean isEventTime() {
        return isEventTime;
    }
}

下面我们逐个分析SliceAssigner的具体实现。

TumblingSliceAssigner

用于tumbling window(滚动窗口)。TumblingSliceAssigner的slice不共享,一个window划分中一个slice。它的代码如下:

public static final class TumblingSliceAssigner extends AbstractSliceAssigner
        implements SliceUnsharedAssigner {
    private static final long serialVersionUID = 1L;

    /** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
    public TumblingSliceAssigner withOffset(Duration offset) {
        return new TumblingSliceAssigner(rowtimeIndex, shiftTimeZone, size, offset.toMillis());
    }

    // 窗口大小(时间跨度)
    private final long size;
    // 窗口起始时间偏移量
    private final long offset;
    // 用来保存需要清除数据(过期)的slice的end
    private final ReusableListIterable reuseExpiredList = new ReusableListIterable();

    private TumblingSliceAssigner(
            int rowtimeIndex, ZoneId shiftTimeZone, long size, long offset) {
        super(rowtimeIndex, shiftTimeZone);
        checkArgument(
                size > 0,
                String.format(
                        "Tumbling Window parameters must satisfy size > 0, but got size %dms.",
                        size));
        checkArgument(
                Math.abs(offset) < size,
                String.format(
                        "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",
                        size, offset));
        this.size = size;
        this.offset = offset;
    }

    @Override
    public long assignSliceEnd(long timestamp) {
        // 获取window的开始时间
        long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
        // 开始时间+窗口大小即window的结束时间
        // slice不共享,window即slice
        return start + size;
    }

    @Override
    public long getLastWindowEnd(long sliceEnd) {
        // 对于Tumbling Window,每个slice对应着一个window,因此sliceEnd就是windowEnd
        return sliceEnd;
    }

    public long getWindowStart(long windowEnd) {
        // 结束 - 大小 = 开始
        return windowEnd - size;
    }

    @Override
    public Iterable<Long> expiredSlices(long windowEnd) {
        // 清空reuseExpiredList,只加入windowEnd
        // 每次都过期当前window对应的slice,因为Tumbling window的slice不共享
        reuseExpiredList.reset(windowEnd);
        return reuseExpiredList;
    }

    @Override
    public long getSliceEndInterval() {
        // slice间隔时间为window的size
        return size;
    }
}

CumulativeSliceAssigner

用于Cumulaive window。Cumulative window有一个最大长度maxSize,超过这个长度之后这一批数据的累计统计结束,开始新一批的统计。还有一个步进step,即在同一批数据累计过程中,每过多久输出一次中间结果。maxSize必须是step的整数倍。所以说Cumulaive window每个step划分为一个slice。Slice需要共享。

public static final class CumulativeSliceAssigner extends AbstractSliceAssigner
        implements SliceSharedAssigner {
    private static final long serialVersionUID = 1L;

    /** Creates a new {@link CumulativeSliceAssigner} with a new specified offset. */
    public CumulativeSliceAssigner withOffset(Duration offset) {
        return new CumulativeSliceAssigner(
                rowtimeIndex, shiftTimeZone, maxSize, step, offset.toMillis());
    }

    // window的最大长度
    private final long maxSize;
    // 每隔多久输出一次累计值
    private final long step;
    private final long offset;
    // 记录需要合并的slice
    private final ReusableListIterable reuseToBeMergedList = new ReusableListIterable();
    // 记录需要过期的slice
    private final ReusableListIterable reuseExpiredList = new ReusableListIterable();

    protected CumulativeSliceAssigner(
            int rowtimeIndex, ZoneId shiftTimeZone, long maxSize, long step, long offset) {
        super(rowtimeIndex, shiftTimeZone);
        if (maxSize <= 0 || step <= 0) {
            throw new IllegalArgumentException(
                    String.format(
                            "Cumulative Window parameters must satisfy maxSize > 0 and step > 0, but got maxSize %dms and step %dms.",
                            maxSize, step));
        }
        if (maxSize % step != 0) {
            throw new IllegalArgumentException(
                    String.format(
                            "Cumulative Window requires maxSize must be an integral multiple of step, but got maxSize %dms and step %dms.",
                            maxSize, step));
        }

        this.maxSize = maxSize;
        this.step = step;
        this.offset = offset;
    }

    @Override
    public long assignSliceEnd(long timestamp) {
        // 计算window的开始时间
        // 注意这里头传入的windowSize实际上是step而不是maxSize。这里将每个slice作为一个window来计算window start
        // 因此,实际上计算出来的是slice start
        long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, step);
        // slice start + step即slice end
        return start + step;
    }

    @Override
    public long getLastWindowEnd(long sliceEnd) {
        long windowStart = getWindowStart(sliceEnd);
        return windowStart + maxSize;
    }

    @Override
    public long getWindowStart(long windowEnd) {
        // 返回window的开始时间
        return TimeWindow.getWindowStartWithOffset(windowEnd - 1, offset, maxSize);
    }

    @Override
    public Iterable<Long> expiredSlices(long windowEnd) {
        // 获取window开始时间
        long windowStart = getWindowStart(windowEnd);
        // 获取第一个slice结束时间
        long firstSliceEnd = windowStart + step;
        // 获取属于这个窗口的最后一个slice的结束时间
        long lastSliceEnd = windowStart + maxSize;
        if (windowEnd == firstSliceEnd) {
            // we share state in the first slice, skip cleanup for the first slice
            // 如果是第一个slice,不清除任何slice
            reuseExpiredList.clear();
        } else if (windowEnd == lastSliceEnd) {
            // when this is the last slice,
            // we need to cleanup the shared state (i.e. first slice) and the current slice
            // 如果到达了window最后的slice,需要清除第一个slice和当前的slice
            // 为什么不用清除所有的slice,是因为下面mergeSlices方法将后面slice的计算结果合并到了第一个slice中
            reuseExpiredList.reset(windowEnd, firstSliceEnd);
        } else {
            // clean up current slice
            // 其他情况,清除当前的slice即可
            reuseExpiredList.reset(windowEnd);
        }
        return reuseExpiredList;
    }

    @Override
    public long getSliceEndInterval() {
        // 间隔时间即slice步长
        return step;
    }

    @Override
    public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
        // 该方法将sliceEnd对应的slice内容合并到window第一个slice中
        // 获取window开始时间
        long windowStart = getWindowStart(sliceEnd);
        // 第一个slice的结束时间
        long firstSliceEnd = windowStart + step;
        if (sliceEnd == firstSliceEnd) {
            // if this is the first slice, there is nothing to merge
            // 如果相等,说明这是window中的第一个slice,不需要合并
            reuseToBeMergedList.clear();
        } else {
            // otherwise, merge the current slice state into the first slice state
            // 否则,返回当前slice
            reuseToBeMergedList.reset(sliceEnd);
        }
        // 将当前slice合并到第一个slice中
        callback.merge(firstSliceEnd, reuseToBeMergedList);
    }

    @Override
    public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty) {
        // 下一个window的结束时间为这个window的结束时间+步长
        long nextWindowEnd = windowEnd + step;
        // 获取下一个累计结束时候的window结束时间
        long maxWindowEnd = getWindowStart(windowEnd) + maxSize;
        if (nextWindowEnd > maxWindowEnd) {
            return Optional.empty();
        } else {
            return Optional.of(nextWindowEnd);
        }
    }
}

HoppingSliceAssigner

用于Hopping window。Hopping window为滑动窗口。具有两个重要属性:滑动距离(slide)和window跨度(size)。要求size必须为slide的整数倍。Hopping window之间是一定有重叠的,因此slice需要共享。为了确保slice跨度尽可能的大(减少合并次数)和尽可能复用(不跨window边界),slice的跨度值选取size和slide的最大公约数。

public static final class HoppingSliceAssigner extends AbstractSliceAssigner
        implements SliceSharedAssigner {
    private static final long serialVersionUID = 1L;

    /** Creates a new {@link HoppingSliceAssigner} with a new specified offset. */
    public HoppingSliceAssigner withOffset(Duration offset) {
        return new HoppingSliceAssigner(
                rowtimeIndex, shiftTimeZone, size, slide, offset.toMillis());
    }

    // window大小
    private final long size;
    // 滑动距离
    private final long slide;
    // 初始偏移
    private final long offset;
    // slice大小
    private final long sliceSize;
    // 每个window有多少个slice
    private final int numSlicesPerWindow;
    private final ReusableListIterable reuseExpiredList = new ReusableListIterable();

    protected HoppingSliceAssigner(
            int rowtimeIndex, ZoneId shiftTimeZone, long size, long slide, long offset) {
        super(rowtimeIndex, shiftTimeZone);
        if (size <= 0 || slide <= 0) {
            throw new IllegalArgumentException(
                    String.format(
                            "Hopping Window must satisfy slide > 0 and size > 0, but got slide %dms and size %dms.",
                            slide, size));
        }
        if (size % slide != 0) {
            throw new IllegalArgumentException(
                    String.format(
                            "Slicing Hopping Window requires size must be an integral multiple of slide, but got size %dms and slide %dms.",
                            size, slide));
        }
        this.size = size;
        this.slide = slide;
        this.offset = offset;
        // slice大小为window size和滑动距离的最大公约数
        this.sliceSize = ArithmeticUtils.gcd(size, slide);
        // 每个window拥有的slice数量为window大小 / slice大小
        this.numSlicesPerWindow = MathUtils.checkedDownCast(size / sliceSize);
    }

    @Override
    public long assignSliceEnd(long timestamp) {
        // 计算slice start
        long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, sliceSize);
        // sliceStart + sliceSize = sliceEnd
        return start + sliceSize;
    }

    @Override
    public long getLastWindowEnd(long sliceEnd) {
        // slice所属的最后一个window end为slice start + window size
        return sliceEnd - sliceSize + size;
    }

    @Override
    public long getWindowStart(long windowEnd) {
        return windowEnd - size;
    }

    @Override
    public Iterable<Long> expiredSlices(long windowEnd) {
        // we need to cleanup the first slice of the window
        // 每次都清理掉当前window中的第一个slice,每次都只有这一个过期
        long windowStart = getWindowStart(windowEnd);
        long firstSliceEnd = windowStart + sliceSize;
        reuseExpiredList.reset(firstSliceEnd);
        return reuseExpiredList;
    }

    @Override
    public long getSliceEndInterval() {
        // slice size就是slice间隔时间
        return sliceSize;
    }

    @Override
    public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
        // the iterable to list all the slices of the triggered window
        // 返回window中所有的slice
        Iterable<Long> toBeMerged =
                new HoppingSlicesIterable(sliceEnd, sliceSize, numSlicesPerWindow);
        // null namespace means use heap data views, instead of state data views
        // mergeResult为null意味着计算结果不在state中存储
        callback.merge(null, toBeMerged);
    }

    @Override
    public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty) {
        // 如果下一个需要触发的window内容不为空就返回下一个window
        if (isWindowEmpty.get()) {
            return Optional.empty();
        } else {
            return Optional.of(windowEnd + sliceSize);
        }
    }
}

WindowedSliceAssigner

适用于window开始时间和结束时间在element中携带这类场景。代码较为简单不过多分析。

public static final class WindowedSliceAssigner implements SliceUnsharedAssigner {
    private static final long serialVersionUID = 1L;

    // element中第几个字段表示的是window end
    private final int windowEndIndex;
    // 包装的assigner
    private final SliceAssigner innerAssigner;
    private final ReusableListIterable reuseExpiredList = new ReusableListIterable();

    public WindowedSliceAssigner(int windowEndIndex, SliceAssigner innerAssigner) {
        checkArgument(
                windowEndIndex >= 0,
                "Windowed slice assigner must have a positive window end index.");
        this.windowEndIndex = windowEndIndex;
        this.innerAssigner = innerAssigner;
    }

    @Override
    public long assignSliceEnd(RowData element, ClockService clock) {
        // 获取element中携带的window end
        return element.getTimestamp(windowEndIndex, 3).getMillisecond();
    }

    @Override
    public long getLastWindowEnd(long sliceEnd) {
        // we shouldn't use innerAssigner.getLastWindowEnd here,
        // because WindowedSliceAssigner is slice unshared, an attached window can't be
        // shared with other windows and the last window should be itself.
        // 由于slice不共享,直接返回sliceEnd
        return sliceEnd;
    }

    @Override
    public long getWindowStart(long windowEnd) {
        return innerAssigner.getWindowStart(windowEnd);
    }

    @Override
    public Iterable<Long> expiredSlices(long windowEnd) {
        reuseExpiredList.reset(windowEnd);
        return reuseExpiredList;
    }

    @Override
    public long getSliceEndInterval() {
        return innerAssigner.getSliceEndInterval();
    }

    @Override
    public boolean isEventTime() {
        // it always works in event-time mode if input row has been attached windows
        // 只适用于event time
        return true;
    }
}

slice的分配逻辑分析完了,接下来开始slice window数据的计算过程分析。

SlicingWindowProcessor

Cumulaive window用于处理slice window的数据。具有子类AbstractWindowAggProcessor。该类拥有两个实现类:

SlicingWindowProcessor继承图

AbstractWindowAggProcessor

我们重点分析每个元素到来的时候AbstractWindowAggProcessor的处理逻辑。这个逻辑位于processElement方法。代码如下:

// 返回true说明这个元素可以被丢弃,false则不可丢弃
@Override
public boolean processElement(RowData key, RowData element) throws Exception {
    // 返回这个元素所属的slice
    long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
    if (!isEventTime) {
        // always register processing time for every element when processing time mode
        // 如果使用processing time,在slice结束的时候注册一个processing time timer,用于触发计算
        windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
    }

    if (isEventTime && isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {
        // 如果slice已触发计算,说明这个元素来迟了
        // the assigned slice has been triggered, which means current element is late,
        // but maybe not need to drop
        // 获取这个slice所属的最后一个window
        long lastWindowEnd = sliceAssigner.getLastWindowEnd(sliceEnd);
        if (isWindowFired(lastWindowEnd, currentProgress, shiftTimeZone)) {
            // the last window has been triggered, so the element can be dropped now
            // 如果这个window已触发计算,这个元素可以丢弃
            return true;
        } else {
            // sliceStateMergeTarget返回保存合并状态的slice,即把一系列slice合并到哪个slice中
            // 缓存slice数据,可以认为是一个multiKeyMap(多个key的map)。key,合并后的sliceEnd对应一系列的element
            windowBuffer.addElement(key, sliceStateMergeTarget(sliceEnd), element);
            // we need to register a timer for the next unfired window,
            // because this may the first time we see elements under the key
            long unfiredFirstWindow = sliceEnd;
            while (isWindowFired(unfiredFirstWindow, currentProgress, shiftTimeZone)) {
                // unfiredFirstWindow一直加windowInterval(sliceAssigner.getSliceEndInterval()),直到它大于currentProgress
                // 最终结果是下一个未触发计算的slice
                unfiredFirstWindow += windowInterval;
            }
            // 注册一个event time timer
            windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);
            return false;
        }
    } else {
        // the assigned slice hasn't been triggered, accumulate into the assigned slice
        // 如果slice没触发计算,将元素加入slice缓存
        windowBuffer.addElement(key, sliceEnd, element);
        return false;
    }
}

SliceUnsharedWindowAggProcessor

非共享slice window聚合处理器。代码和分析如下:

public final class SliceUnsharedWindowAggProcessor extends AbstractWindowAggProcessor {
    private static final long serialVersionUID = 1L;

    public SliceUnsharedWindowAggProcessor(
            GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
            WindowBuffer.Factory windowBufferFactory,
            SliceUnsharedAssigner sliceAssigner,
            TypeSerializer<RowData> accSerializer,
            ZoneId shiftTimeZone) {
        super(genAggsHandler, windowBufferFactory, sliceAssigner, accSerializer, shiftTimeZone);
    }

    // 触发计算
    @Override
    public void fireWindow(Long windowEnd) throws Exception {
        // 返回window计算出的累计值
        RowData acc = windowState.value(windowEnd);
        if (acc == null) {
            // 如果没有在创建一个累加器
            acc = aggregator.createAccumulators();
        }
        // 设置当前window的累加器
        aggregator.setAccumulators(windowEnd, acc);
        // 获取累计结果
        RowData aggResult = aggregator.getValue(windowEnd);
        // 输出结果到下游
        collect(aggResult);
    }

    @Override
    protected long sliceStateMergeTarget(long sliceToMerge) throws Exception {
        // 非共享slice模式不存在merge情况,即merge前后为同一个slice。直接返回slice
        return sliceToMerge;
    }
}

SliceSharedWindowAggProcessor

共享slice window聚合处理器。该处理器需要处理slice合并的具体逻辑。代码和分析如下:

public final class SliceSharedWindowAggProcessor extends AbstractWindowAggProcessor
        implements SliceSharedAssigner.MergeCallback {
    private static final long serialVersionUID = 1L;

    private final SliceSharedAssigner sliceSharedAssigner;
    private final WindowIsEmptySupplier emptySupplier;
    // 实现了SliceSharedAssigner.MergeCallback,处理merge的结果
    private final SliceMergeTargetHelper mergeTargetHelper;

    public SliceSharedWindowAggProcessor(
            GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
            WindowBuffer.Factory bufferFactory,
            SliceSharedAssigner sliceAssigner,
            TypeSerializer<RowData> accSerializer,
            int indexOfCountStar,
            ZoneId shiftTimeZone) {
        super(genAggsHandler, bufferFactory, sliceAssigner, accSerializer, shiftTimeZone);
        this.sliceSharedAssigner = sliceAssigner;
        this.mergeTargetHelper = new SliceMergeTargetHelper();
        this.emptySupplier = new WindowIsEmptySupplier(indexOfCountStar, sliceAssigner);
    }

    @Override
    public void fireWindow(Long windowEnd) throws Exception {
        // 合并slice,指定自己为MergeCallback
        // SliceSharedWindowAggProcessor实现了MergeCallback的merge方法
        sliceSharedAssigner.mergeSlices(windowEnd, this);
        // we have set accumulator in the merge() method
        // 获取window统计结果
        RowData aggResult = aggregator.getValue(windowEnd);
        // 如果window内容不为空,收集计算聚合结果到下游
        if (!isWindowEmpty()) {
            // for hopping windows, the triggered window may be an empty window
            // (see register next window below), for such window, we shouldn't emit it
            // 因为hopping window中可能不存在数据
            collect(aggResult);
        }

        // we should register next window timer here,
        // because slices are shared, maybe no elements arrived for the next slices
        // 获取下一个需要触发的window,设置定时器
        Optional<Long> nextWindowEndOptional =
                sliceSharedAssigner.nextTriggerWindow(windowEnd, emptySupplier);
        if (nextWindowEndOptional.isPresent()) {
            long nextWindowEnd = nextWindowEndOptional.get();
            if (sliceSharedAssigner.isEventTime()) {
                windowTimerService.registerEventTimeWindowTimer(nextWindowEnd);
            } else {
                windowTimerService.registerProcessingTimeWindowTimer(nextWindowEnd);
            }
        }
    }

    // slice合并的具体操作在此
    // sliceSharedAssigner.mergeSlices(windowEnd, this)调用了此方法
    @Override
    public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
        // get base accumulator
        final RowData acc;
        // mergeResult为null表示结果不在state中保存
        // Hop window的结果不在state中保存
        if (mergeResult == null) {
            // null means the merged is not on state, create a new acc
            acc = aggregator.createAccumulators();
        } else {
            // 否则从state中读取聚合结果
            RowData stateAcc = windowState.value(mergeResult);
            if (stateAcc == null) {
                acc = aggregator.createAccumulators();
            } else {
                acc = stateAcc;
            }
        }
        // set base accumulator
        aggregator.setAccumulators(mergeResult, acc);

        // merge slice accumulators
        // 聚合所有toBeMerged slice的值
        for (Long slice : toBeMerged) {
            RowData sliceAcc = windowState.value(slice);
            if (sliceAcc != null) {
                aggregator.merge(slice, sliceAcc);
            }
        }

        // set merged acc into state if the merged acc is on state
        // 如果mergeResult不为null,更新聚合结果到windowState
        if (mergeResult != null) {
            windowState.update(mergeResult, aggregator.getAccumulators());
        }
    }

    // 该方法执行merge slice操作
    // slice数据迟到的时候调用
    // 迟到数据属于哪个slice就被merge到哪个slice中
    protected long sliceStateMergeTarget(long sliceToMerge) throws Exception {
        // 将sliceToMerge作为merge结果返回
        mergeTargetHelper.setMergeTarget(null);
        sliceSharedAssigner.mergeSlices(sliceToMerge, mergeTargetHelper);

        // the mergeTarget might be null, which means the merging happens in memory instead of
        // on state, so the slice state to merge into is itself.
        if (mergeTargetHelper.getMergeTarget() != null) {
            return mergeTargetHelper.getMergeTarget();
        } else {
            return sliceToMerge;
        }
    }

    private boolean isWindowEmpty() {
        if (emptySupplier.indexOfCountStar < 0) {
            // 对于hopping window会自动添加一个count(*)字段用来判断window内是否有元素
            // 对于非hopping window,比如cumulative window,没有count(*)字段,永远不会为空
            // for non-hopping windows, the window is never empty
            return false;
        } else {
            return emptySupplier.get();
        }
    }

    // ------------------------------------------------------------------------------------------

    private final class WindowIsEmptySupplier implements Supplier<Boolean>, Serializable {
        private static final long serialVersionUID = 1L;

        // count(*) 字段的index,仅用于hopping window。用来判断窗口中是否有数据
        private final int indexOfCountStar;

        private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner assigner) {
            if (assigner instanceof SliceAssigners.HoppingSliceAssigner) {
                checkArgument(
                        indexOfCountStar >= 0,
                        "Hopping window requires a COUNT(*) in the aggregate functions.");
            }
            this.indexOfCountStar = indexOfCountStar;
        }

        @Override
        public Boolean get() {
            if (indexOfCountStar < 0) {
                return false;
            }
            try {
                // hopping window情况,如果累加器为空,或者count(*)返回0,说明没有数据,window为空
                RowData acc = aggregator.getAccumulators();
                return acc == null || acc.getLong(indexOfCountStar) == 0;
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    }

    // 将mergeResult作为mergeTarget返回
    private static final class SliceMergeTargetHelper
            implements SliceSharedAssigner.MergeCallback, Serializable {

        private static final long serialVersionUID = 1L;
        private Long mergeTarget = null;

        @Override
        public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
            this.mergeTarget = mergeResult;
        }

        public Long getMergeTarget() {
            return mergeTarget;
        }

        public void setMergeTarget(Long mergeTarget) {
            this.mergeTarget = mergeTarget;
        }
    }
}

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

上一篇 下一篇

猜你喜欢

热点阅读