Flink 源码之 Window Slice
Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
前言
Window slice是Flink对SQL window聚合的一种优化方式。我们回忆下window的三种窗口,其中如果使用hopping window或cumulative window,不难发现window和window之间是有重叠的。如何避免重复计算窗口间重叠的部分很明显是优化的一个方向。为了最大化复用重叠部分的数据,Flink引入了切片(slice)的概念。一个window可以拆分成1个或多个slice的组合(必须恰好组合成window,不能存在slice跨过window边界的情况),对于整个window的计算粒度可以分解为对slice的计算。Flink将这些slice缓存,从而实现了slice复用,避免了window重叠部分重复计算。以上是window slice优化的思路。对于滚动窗口而言,一个窗口就是一个切片(窗口间无重叠,切片之间不共享);而对滑动/累积窗口而言,一个窗口可能包含多个切片,一个切片也可能位于多个窗口中(切片之间会共享)。
接下来我们开始分析整个优化过程的核心逻辑。
SqlWindowTableFunction
Flink支持window table valued function(TVF)。TVF对应的window计算function的基类为SqlWindowTableFunction
它有3个子类:
- SqlCumulateTableFunction
- SqlHopTableFunction
- SqlTumbleTableFunction
分别对应了累计窗口,滑动窗口和滚动窗口TVF。
这些方法在SQL优化规则中(ProjectWindowTableFunctionTransposeRule和)生成。LogicalProject
转换为LogicalTableFunctionScan
。SQL优化的过程不是本篇重点,后续单独开篇分析。
SliceAssigner
SliceAssigner
用途是将元素指定给某个slice(根据元素的时间,判断位于哪个slice中)。StreamExecWindowAggregateBase
根据window的执行计划,创建对应的assigner。
在Slice优化过程中,通常使用slice/window的结束时间戳来表示slice/window。
SliceAssigner
有两个子接口:
- SliceSharedAssigner
- SliceUnsharedAssigner
分别代表了slice共享和slice不共享的assigner。
SliceAssigner继承图接下来分析它们的源代码:
@Internal
public interface SliceAssigner extends Serializable {
/**
* Returns the end timestamp of a slice that the given element should belong.
*
* @param element the element to which slice should belong to.
* @param clock the service to get current processing time.
*/
// 获取元素所属的slice,返回这个slice的结束时间戳
long assignSliceEnd(RowData element, ClockService clock);
/**
* Returns the last window which the slice belongs to. The window and and slices are both
* identified by the end timestamp.
*/
// 获取slice所属的window,返回这个window的结束时间戳
// 如果slice属于多个window(共享slice),则返回这个slice所属的最后一个window
long getLastWindowEnd(long sliceEnd);
/** Returns the corresponding window start timestamp of the given window end timestamp. */
// 给出window的结束时间,返回这个window的开始时间
long getWindowStart(long windowEnd);
/**
* Returns an iterator of slices to expire when the given window is emitted. The window and
* slices are both identified by the end timestamp.
*
* @param windowEnd the end timestamp of window emitted.
*/
// 当window数据发送到下游之后,返回需要过期处理的slice的iterator(有些共享的slice再也用不到了)
// window和slice都用结束时间戳来表示
Iterable<Long> expiredSlices(long windowEnd);
/**
* Returns the interval of slice ends, i.e. the step size to advance of the slice end when a new
* slice assigned.
*/
// 返回slice之间的时间间隔。比如说分配下一个slice的时候,新的slice时间需要前进多少
long getSliceEndInterval();
/**
* Returns {@code true} if elements are assigned to windows based on event time, {@code false}
* based on processing time.
*/
// 返回时候使用 event time。如果返回false说明使用的是processing time
boolean isEventTime();
}
SliceSharedAssigner
SliceSharedAssigner
在window之间共享slice。一个window被分割为多个slice。当发送window数据的时候,需要合并slice,形成window计算结果。
源代码如下:
@Internal
public interface SliceSharedAssigner extends SliceAssigner {
/**
* Determines which slices (if any) should be merged.
*
* @param sliceEnd the triggered slice, identified by end timestamp
* @param callback a callback that can be invoked to signal which slices should be merged.
*/
// 合并slice。sliceEnd为触发合并操作的slice,callback为合并slice的操作
void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception;
/**
* Returns the optional end timestamp of next window which should be triggered. Empty if no
* following window to trigger for now.
*
* <p>The purpose of this method is avoid register too many timers for each hopping and
* cumulative slice, e.g. HOP(1day, 10s) needs register 8640 timers for every slice. In order to
* improve this, we only register one timer for the next window. For hopping windows we don't
* register next window if current window is empty (i.e. no records in current window). That
* means we will have one more unnecessary window triggered for hopping windows if no elements
* arrives for a key for a long time. We will skip to emit window result for the triggered empty
* window, see {@link SliceSharedWindowAggProcessor#fireWindow(Long)}.
*
* @param windowEnd the current triggered window, identified by end timestamp
* @param isWindowEmpty a supplier that can be invoked to get whether the triggered window is
* empty (i.e. no records in the window).
*/
// 返回下一个需要触发计算的window
// windowEnd为当前触发计算的window的结束时间戳
// isWindowEmpty为触发计算的window内是否有数据,是一个supplier类型,在需要计算的时候再返回结果
Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty);
// ------------------------------------------------------------------------
/**
* Callback to be used in {@link #mergeSlices(long, MergeCallback)} for specifying which slices
* should be merged.
*/
// 合并slice的回调函数,确定哪些slice需要合并,执行合并操作
interface MergeCallback {
/**
* Specifies that states of the given slices should be merged into the result slice.
*
* @param mergeResult The resulting merged slice, {@code null} if it represents a non-state
* namespace.
* @param toBeMerged The list of slices that should be merged into one slice.
*/
// mergeResult 合并后的slice(结束时间戳表示)
// toBeMerged 需要合并的slice(同样是结束时间戳表示)
void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception;
}
}
SliceUnsharedAssigner
SliceUnsharedAssigner
表示分配的slice不需要在多个window之间共享,因此1个window只会被分割成1个slice。发送window数据的时候不需要合并多个slice。该接口没有任何专属的方法。代码如下所示:
@Internal
public interface SliceUnsharedAssigner extends SliceAssigner {}
AbstractSliceAssigner
这是具备SlicAssigner
基础功能的抽象类。所有SliceAssigner
的实现类都继承自该类。我们查看下基本功能有哪些。
private abstract static class AbstractSliceAssigner implements SliceAssigner {
private static final long serialVersionUID = 1L;
// rowtime字段位于RowData的第几列
protected final int rowtimeIndex;
// 是否使用event time
protected final boolean isEventTime;
// 时区ID
protected final ZoneId shiftTimeZone;
protected AbstractSliceAssigner(int rowtimeIndex, ZoneId shiftTimeZone) {
this.rowtimeIndex = rowtimeIndex;
this.shiftTimeZone = shiftTimeZone;
this.isEventTime = rowtimeIndex >= 0;
}
// 引出一个新的抽象方法。传入的参数为数据对应的timestamp
// 详细请见下面的方法分析
public abstract long assignSliceEnd(long timestamp);
@Override
public final long assignSliceEnd(RowData element, ClockService clock) {
// 这个方法提取出element对应的时间
final long timestamp;
if (rowtimeIndex >= 0) {
// 如果是event time模式
// 提取出这行数据对应的rowtime,转换时间为UTC
// Precision for row timestamp is always 3
TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
timestamp = toUtcTimestampMills(rowTime.getMillisecond(), shiftTimeZone);
} else {
// 如果是processing time 模式,转换当前时间为UTC
// in processing time mode
timestamp = toUtcTimestampMills(clock.currentProcessingTime(), shiftTimeZone);
}
// 根据提取出的时间,分配slice
return assignSliceEnd(timestamp);
}
@Override
public final boolean isEventTime() {
return isEventTime;
}
}
下面我们逐个分析SliceAssigner
的具体实现。
TumblingSliceAssigner
用于tumbling window(滚动窗口)。TumblingSliceAssigner
的slice不共享,一个window划分中一个slice。它的代码如下:
public static final class TumblingSliceAssigner extends AbstractSliceAssigner
implements SliceUnsharedAssigner {
private static final long serialVersionUID = 1L;
/** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
public TumblingSliceAssigner withOffset(Duration offset) {
return new TumblingSliceAssigner(rowtimeIndex, shiftTimeZone, size, offset.toMillis());
}
// 窗口大小(时间跨度)
private final long size;
// 窗口起始时间偏移量
private final long offset;
// 用来保存需要清除数据(过期)的slice的end
private final ReusableListIterable reuseExpiredList = new ReusableListIterable();
private TumblingSliceAssigner(
int rowtimeIndex, ZoneId shiftTimeZone, long size, long offset) {
super(rowtimeIndex, shiftTimeZone);
checkArgument(
size > 0,
String.format(
"Tumbling Window parameters must satisfy size > 0, but got size %dms.",
size));
checkArgument(
Math.abs(offset) < size,
String.format(
"Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",
size, offset));
this.size = size;
this.offset = offset;
}
@Override
public long assignSliceEnd(long timestamp) {
// 获取window的开始时间
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
// 开始时间+窗口大小即window的结束时间
// slice不共享,window即slice
return start + size;
}
@Override
public long getLastWindowEnd(long sliceEnd) {
// 对于Tumbling Window,每个slice对应着一个window,因此sliceEnd就是windowEnd
return sliceEnd;
}
public long getWindowStart(long windowEnd) {
// 结束 - 大小 = 开始
return windowEnd - size;
}
@Override
public Iterable<Long> expiredSlices(long windowEnd) {
// 清空reuseExpiredList,只加入windowEnd
// 每次都过期当前window对应的slice,因为Tumbling window的slice不共享
reuseExpiredList.reset(windowEnd);
return reuseExpiredList;
}
@Override
public long getSliceEndInterval() {
// slice间隔时间为window的size
return size;
}
}
CumulativeSliceAssigner
用于Cumulaive window。Cumulative window有一个最大长度maxSize,超过这个长度之后这一批数据的累计统计结束,开始新一批的统计。还有一个步进step,即在同一批数据累计过程中,每过多久输出一次中间结果。maxSize必须是step的整数倍。所以说Cumulaive window每个step划分为一个slice。Slice需要共享。
public static final class CumulativeSliceAssigner extends AbstractSliceAssigner
implements SliceSharedAssigner {
private static final long serialVersionUID = 1L;
/** Creates a new {@link CumulativeSliceAssigner} with a new specified offset. */
public CumulativeSliceAssigner withOffset(Duration offset) {
return new CumulativeSliceAssigner(
rowtimeIndex, shiftTimeZone, maxSize, step, offset.toMillis());
}
// window的最大长度
private final long maxSize;
// 每隔多久输出一次累计值
private final long step;
private final long offset;
// 记录需要合并的slice
private final ReusableListIterable reuseToBeMergedList = new ReusableListIterable();
// 记录需要过期的slice
private final ReusableListIterable reuseExpiredList = new ReusableListIterable();
protected CumulativeSliceAssigner(
int rowtimeIndex, ZoneId shiftTimeZone, long maxSize, long step, long offset) {
super(rowtimeIndex, shiftTimeZone);
if (maxSize <= 0 || step <= 0) {
throw new IllegalArgumentException(
String.format(
"Cumulative Window parameters must satisfy maxSize > 0 and step > 0, but got maxSize %dms and step %dms.",
maxSize, step));
}
if (maxSize % step != 0) {
throw new IllegalArgumentException(
String.format(
"Cumulative Window requires maxSize must be an integral multiple of step, but got maxSize %dms and step %dms.",
maxSize, step));
}
this.maxSize = maxSize;
this.step = step;
this.offset = offset;
}
@Override
public long assignSliceEnd(long timestamp) {
// 计算window的开始时间
// 注意这里头传入的windowSize实际上是step而不是maxSize。这里将每个slice作为一个window来计算window start
// 因此,实际上计算出来的是slice start
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, step);
// slice start + step即slice end
return start + step;
}
@Override
public long getLastWindowEnd(long sliceEnd) {
long windowStart = getWindowStart(sliceEnd);
return windowStart + maxSize;
}
@Override
public long getWindowStart(long windowEnd) {
// 返回window的开始时间
return TimeWindow.getWindowStartWithOffset(windowEnd - 1, offset, maxSize);
}
@Override
public Iterable<Long> expiredSlices(long windowEnd) {
// 获取window开始时间
long windowStart = getWindowStart(windowEnd);
// 获取第一个slice结束时间
long firstSliceEnd = windowStart + step;
// 获取属于这个窗口的最后一个slice的结束时间
long lastSliceEnd = windowStart + maxSize;
if (windowEnd == firstSliceEnd) {
// we share state in the first slice, skip cleanup for the first slice
// 如果是第一个slice,不清除任何slice
reuseExpiredList.clear();
} else if (windowEnd == lastSliceEnd) {
// when this is the last slice,
// we need to cleanup the shared state (i.e. first slice) and the current slice
// 如果到达了window最后的slice,需要清除第一个slice和当前的slice
// 为什么不用清除所有的slice,是因为下面mergeSlices方法将后面slice的计算结果合并到了第一个slice中
reuseExpiredList.reset(windowEnd, firstSliceEnd);
} else {
// clean up current slice
// 其他情况,清除当前的slice即可
reuseExpiredList.reset(windowEnd);
}
return reuseExpiredList;
}
@Override
public long getSliceEndInterval() {
// 间隔时间即slice步长
return step;
}
@Override
public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
// 该方法将sliceEnd对应的slice内容合并到window第一个slice中
// 获取window开始时间
long windowStart = getWindowStart(sliceEnd);
// 第一个slice的结束时间
long firstSliceEnd = windowStart + step;
if (sliceEnd == firstSliceEnd) {
// if this is the first slice, there is nothing to merge
// 如果相等,说明这是window中的第一个slice,不需要合并
reuseToBeMergedList.clear();
} else {
// otherwise, merge the current slice state into the first slice state
// 否则,返回当前slice
reuseToBeMergedList.reset(sliceEnd);
}
// 将当前slice合并到第一个slice中
callback.merge(firstSliceEnd, reuseToBeMergedList);
}
@Override
public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty) {
// 下一个window的结束时间为这个window的结束时间+步长
long nextWindowEnd = windowEnd + step;
// 获取下一个累计结束时候的window结束时间
long maxWindowEnd = getWindowStart(windowEnd) + maxSize;
if (nextWindowEnd > maxWindowEnd) {
return Optional.empty();
} else {
return Optional.of(nextWindowEnd);
}
}
}
HoppingSliceAssigner
用于Hopping window。Hopping window为滑动窗口。具有两个重要属性:滑动距离(slide)和window跨度(size)。要求size必须为slide的整数倍。Hopping window之间是一定有重叠的,因此slice需要共享。为了确保slice跨度尽可能的大(减少合并次数)和尽可能复用(不跨window边界),slice的跨度值选取size和slide的最大公约数。
public static final class HoppingSliceAssigner extends AbstractSliceAssigner
implements SliceSharedAssigner {
private static final long serialVersionUID = 1L;
/** Creates a new {@link HoppingSliceAssigner} with a new specified offset. */
public HoppingSliceAssigner withOffset(Duration offset) {
return new HoppingSliceAssigner(
rowtimeIndex, shiftTimeZone, size, slide, offset.toMillis());
}
// window大小
private final long size;
// 滑动距离
private final long slide;
// 初始偏移
private final long offset;
// slice大小
private final long sliceSize;
// 每个window有多少个slice
private final int numSlicesPerWindow;
private final ReusableListIterable reuseExpiredList = new ReusableListIterable();
protected HoppingSliceAssigner(
int rowtimeIndex, ZoneId shiftTimeZone, long size, long slide, long offset) {
super(rowtimeIndex, shiftTimeZone);
if (size <= 0 || slide <= 0) {
throw new IllegalArgumentException(
String.format(
"Hopping Window must satisfy slide > 0 and size > 0, but got slide %dms and size %dms.",
slide, size));
}
if (size % slide != 0) {
throw new IllegalArgumentException(
String.format(
"Slicing Hopping Window requires size must be an integral multiple of slide, but got size %dms and slide %dms.",
size, slide));
}
this.size = size;
this.slide = slide;
this.offset = offset;
// slice大小为window size和滑动距离的最大公约数
this.sliceSize = ArithmeticUtils.gcd(size, slide);
// 每个window拥有的slice数量为window大小 / slice大小
this.numSlicesPerWindow = MathUtils.checkedDownCast(size / sliceSize);
}
@Override
public long assignSliceEnd(long timestamp) {
// 计算slice start
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, sliceSize);
// sliceStart + sliceSize = sliceEnd
return start + sliceSize;
}
@Override
public long getLastWindowEnd(long sliceEnd) {
// slice所属的最后一个window end为slice start + window size
return sliceEnd - sliceSize + size;
}
@Override
public long getWindowStart(long windowEnd) {
return windowEnd - size;
}
@Override
public Iterable<Long> expiredSlices(long windowEnd) {
// we need to cleanup the first slice of the window
// 每次都清理掉当前window中的第一个slice,每次都只有这一个过期
long windowStart = getWindowStart(windowEnd);
long firstSliceEnd = windowStart + sliceSize;
reuseExpiredList.reset(firstSliceEnd);
return reuseExpiredList;
}
@Override
public long getSliceEndInterval() {
// slice size就是slice间隔时间
return sliceSize;
}
@Override
public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
// the iterable to list all the slices of the triggered window
// 返回window中所有的slice
Iterable<Long> toBeMerged =
new HoppingSlicesIterable(sliceEnd, sliceSize, numSlicesPerWindow);
// null namespace means use heap data views, instead of state data views
// mergeResult为null意味着计算结果不在state中存储
callback.merge(null, toBeMerged);
}
@Override
public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty) {
// 如果下一个需要触发的window内容不为空就返回下一个window
if (isWindowEmpty.get()) {
return Optional.empty();
} else {
return Optional.of(windowEnd + sliceSize);
}
}
}
WindowedSliceAssigner
适用于window开始时间和结束时间在element中携带这类场景。代码较为简单不过多分析。
public static final class WindowedSliceAssigner implements SliceUnsharedAssigner {
private static final long serialVersionUID = 1L;
// element中第几个字段表示的是window end
private final int windowEndIndex;
// 包装的assigner
private final SliceAssigner innerAssigner;
private final ReusableListIterable reuseExpiredList = new ReusableListIterable();
public WindowedSliceAssigner(int windowEndIndex, SliceAssigner innerAssigner) {
checkArgument(
windowEndIndex >= 0,
"Windowed slice assigner must have a positive window end index.");
this.windowEndIndex = windowEndIndex;
this.innerAssigner = innerAssigner;
}
@Override
public long assignSliceEnd(RowData element, ClockService clock) {
// 获取element中携带的window end
return element.getTimestamp(windowEndIndex, 3).getMillisecond();
}
@Override
public long getLastWindowEnd(long sliceEnd) {
// we shouldn't use innerAssigner.getLastWindowEnd here,
// because WindowedSliceAssigner is slice unshared, an attached window can't be
// shared with other windows and the last window should be itself.
// 由于slice不共享,直接返回sliceEnd
return sliceEnd;
}
@Override
public long getWindowStart(long windowEnd) {
return innerAssigner.getWindowStart(windowEnd);
}
@Override
public Iterable<Long> expiredSlices(long windowEnd) {
reuseExpiredList.reset(windowEnd);
return reuseExpiredList;
}
@Override
public long getSliceEndInterval() {
return innerAssigner.getSliceEndInterval();
}
@Override
public boolean isEventTime() {
// it always works in event-time mode if input row has been attached windows
// 只适用于event time
return true;
}
}
slice的分配逻辑分析完了,接下来开始slice window数据的计算过程分析。
SlicingWindowProcessor
Cumulaive window
用于处理slice window的数据。具有子类AbstractWindowAggProcessor
。该类拥有两个实现类:
- SliceSharedWindowAggProcessor
- SliceUnsharedWindowAggProcessor
AbstractWindowAggProcessor
我们重点分析每个元素到来的时候AbstractWindowAggProcessor
的处理逻辑。这个逻辑位于processElement
方法。代码如下:
// 返回true说明这个元素可以被丢弃,false则不可丢弃
@Override
public boolean processElement(RowData key, RowData element) throws Exception {
// 返回这个元素所属的slice
long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
if (!isEventTime) {
// always register processing time for every element when processing time mode
// 如果使用processing time,在slice结束的时候注册一个processing time timer,用于触发计算
windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
}
if (isEventTime && isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {
// 如果slice已触发计算,说明这个元素来迟了
// the assigned slice has been triggered, which means current element is late,
// but maybe not need to drop
// 获取这个slice所属的最后一个window
long lastWindowEnd = sliceAssigner.getLastWindowEnd(sliceEnd);
if (isWindowFired(lastWindowEnd, currentProgress, shiftTimeZone)) {
// the last window has been triggered, so the element can be dropped now
// 如果这个window已触发计算,这个元素可以丢弃
return true;
} else {
// sliceStateMergeTarget返回保存合并状态的slice,即把一系列slice合并到哪个slice中
// 缓存slice数据,可以认为是一个multiKeyMap(多个key的map)。key,合并后的sliceEnd对应一系列的element
windowBuffer.addElement(key, sliceStateMergeTarget(sliceEnd), element);
// we need to register a timer for the next unfired window,
// because this may the first time we see elements under the key
long unfiredFirstWindow = sliceEnd;
while (isWindowFired(unfiredFirstWindow, currentProgress, shiftTimeZone)) {
// unfiredFirstWindow一直加windowInterval(sliceAssigner.getSliceEndInterval()),直到它大于currentProgress
// 最终结果是下一个未触发计算的slice
unfiredFirstWindow += windowInterval;
}
// 注册一个event time timer
windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);
return false;
}
} else {
// the assigned slice hasn't been triggered, accumulate into the assigned slice
// 如果slice没触发计算,将元素加入slice缓存
windowBuffer.addElement(key, sliceEnd, element);
return false;
}
}
SliceUnsharedWindowAggProcessor
非共享slice window聚合处理器。代码和分析如下:
public final class SliceUnsharedWindowAggProcessor extends AbstractWindowAggProcessor {
private static final long serialVersionUID = 1L;
public SliceUnsharedWindowAggProcessor(
GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
WindowBuffer.Factory windowBufferFactory,
SliceUnsharedAssigner sliceAssigner,
TypeSerializer<RowData> accSerializer,
ZoneId shiftTimeZone) {
super(genAggsHandler, windowBufferFactory, sliceAssigner, accSerializer, shiftTimeZone);
}
// 触发计算
@Override
public void fireWindow(Long windowEnd) throws Exception {
// 返回window计算出的累计值
RowData acc = windowState.value(windowEnd);
if (acc == null) {
// 如果没有在创建一个累加器
acc = aggregator.createAccumulators();
}
// 设置当前window的累加器
aggregator.setAccumulators(windowEnd, acc);
// 获取累计结果
RowData aggResult = aggregator.getValue(windowEnd);
// 输出结果到下游
collect(aggResult);
}
@Override
protected long sliceStateMergeTarget(long sliceToMerge) throws Exception {
// 非共享slice模式不存在merge情况,即merge前后为同一个slice。直接返回slice
return sliceToMerge;
}
}
SliceSharedWindowAggProcessor
共享slice window聚合处理器。该处理器需要处理slice合并的具体逻辑。代码和分析如下:
public final class SliceSharedWindowAggProcessor extends AbstractWindowAggProcessor
implements SliceSharedAssigner.MergeCallback {
private static final long serialVersionUID = 1L;
private final SliceSharedAssigner sliceSharedAssigner;
private final WindowIsEmptySupplier emptySupplier;
// 实现了SliceSharedAssigner.MergeCallback,处理merge的结果
private final SliceMergeTargetHelper mergeTargetHelper;
public SliceSharedWindowAggProcessor(
GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
WindowBuffer.Factory bufferFactory,
SliceSharedAssigner sliceAssigner,
TypeSerializer<RowData> accSerializer,
int indexOfCountStar,
ZoneId shiftTimeZone) {
super(genAggsHandler, bufferFactory, sliceAssigner, accSerializer, shiftTimeZone);
this.sliceSharedAssigner = sliceAssigner;
this.mergeTargetHelper = new SliceMergeTargetHelper();
this.emptySupplier = new WindowIsEmptySupplier(indexOfCountStar, sliceAssigner);
}
@Override
public void fireWindow(Long windowEnd) throws Exception {
// 合并slice,指定自己为MergeCallback
// SliceSharedWindowAggProcessor实现了MergeCallback的merge方法
sliceSharedAssigner.mergeSlices(windowEnd, this);
// we have set accumulator in the merge() method
// 获取window统计结果
RowData aggResult = aggregator.getValue(windowEnd);
// 如果window内容不为空,收集计算聚合结果到下游
if (!isWindowEmpty()) {
// for hopping windows, the triggered window may be an empty window
// (see register next window below), for such window, we shouldn't emit it
// 因为hopping window中可能不存在数据
collect(aggResult);
}
// we should register next window timer here,
// because slices are shared, maybe no elements arrived for the next slices
// 获取下一个需要触发的window,设置定时器
Optional<Long> nextWindowEndOptional =
sliceSharedAssigner.nextTriggerWindow(windowEnd, emptySupplier);
if (nextWindowEndOptional.isPresent()) {
long nextWindowEnd = nextWindowEndOptional.get();
if (sliceSharedAssigner.isEventTime()) {
windowTimerService.registerEventTimeWindowTimer(nextWindowEnd);
} else {
windowTimerService.registerProcessingTimeWindowTimer(nextWindowEnd);
}
}
}
// slice合并的具体操作在此
// sliceSharedAssigner.mergeSlices(windowEnd, this)调用了此方法
@Override
public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
// get base accumulator
final RowData acc;
// mergeResult为null表示结果不在state中保存
// Hop window的结果不在state中保存
if (mergeResult == null) {
// null means the merged is not on state, create a new acc
acc = aggregator.createAccumulators();
} else {
// 否则从state中读取聚合结果
RowData stateAcc = windowState.value(mergeResult);
if (stateAcc == null) {
acc = aggregator.createAccumulators();
} else {
acc = stateAcc;
}
}
// set base accumulator
aggregator.setAccumulators(mergeResult, acc);
// merge slice accumulators
// 聚合所有toBeMerged slice的值
for (Long slice : toBeMerged) {
RowData sliceAcc = windowState.value(slice);
if (sliceAcc != null) {
aggregator.merge(slice, sliceAcc);
}
}
// set merged acc into state if the merged acc is on state
// 如果mergeResult不为null,更新聚合结果到windowState
if (mergeResult != null) {
windowState.update(mergeResult, aggregator.getAccumulators());
}
}
// 该方法执行merge slice操作
// slice数据迟到的时候调用
// 迟到数据属于哪个slice就被merge到哪个slice中
protected long sliceStateMergeTarget(long sliceToMerge) throws Exception {
// 将sliceToMerge作为merge结果返回
mergeTargetHelper.setMergeTarget(null);
sliceSharedAssigner.mergeSlices(sliceToMerge, mergeTargetHelper);
// the mergeTarget might be null, which means the merging happens in memory instead of
// on state, so the slice state to merge into is itself.
if (mergeTargetHelper.getMergeTarget() != null) {
return mergeTargetHelper.getMergeTarget();
} else {
return sliceToMerge;
}
}
private boolean isWindowEmpty() {
if (emptySupplier.indexOfCountStar < 0) {
// 对于hopping window会自动添加一个count(*)字段用来判断window内是否有元素
// 对于非hopping window,比如cumulative window,没有count(*)字段,永远不会为空
// for non-hopping windows, the window is never empty
return false;
} else {
return emptySupplier.get();
}
}
// ------------------------------------------------------------------------------------------
private final class WindowIsEmptySupplier implements Supplier<Boolean>, Serializable {
private static final long serialVersionUID = 1L;
// count(*) 字段的index,仅用于hopping window。用来判断窗口中是否有数据
private final int indexOfCountStar;
private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner assigner) {
if (assigner instanceof SliceAssigners.HoppingSliceAssigner) {
checkArgument(
indexOfCountStar >= 0,
"Hopping window requires a COUNT(*) in the aggregate functions.");
}
this.indexOfCountStar = indexOfCountStar;
}
@Override
public Boolean get() {
if (indexOfCountStar < 0) {
return false;
}
try {
// hopping window情况,如果累加器为空,或者count(*)返回0,说明没有数据,window为空
RowData acc = aggregator.getAccumulators();
return acc == null || acc.getLong(indexOfCountStar) == 0;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
// 将mergeResult作为mergeTarget返回
private static final class SliceMergeTargetHelper
implements SliceSharedAssigner.MergeCallback, Serializable {
private static final long serialVersionUID = 1L;
private Long mergeTarget = null;
@Override
public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
this.mergeTarget = mergeResult;
}
public Long getMergeTarget() {
return mergeTarget;
}
public void setMergeTarget(Long mergeTarget) {
this.mergeTarget = mergeTarget;
}
}
}
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。