Flink 窗口分配器解析
1 前言
WindowAssigner:用于给当前流中元素分配0个或者多个窗口
相关的抽象方法就是assignWindows
,该方法为某个带有时间戳timestamp的元素element分配一个或多个窗口,并返回窗口集合。
不同的分配器的主要差异就体现在该方法的实现上。
2 源码分析
2.1 翻滚窗口分配器
分为TumblingProcessingTimeWindows
和TumblingEventTimeWindows
TumblingProcessingTimeWindows:
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
getWindowStartWithOffset
方法:计算具有偏移offset的窗口起始时间戳,例如,通过of(Time.hours(1),Time.minutes(15))来指定1个小时的窗口,偏移量offset为15分钟,那么,你将会得到一个窗口开始时间为0:15:00,1:15:00,2:15:00..的窗口。
该方法理解起来比较难,可以选择参数试一试看看计算结果。
注意:这里的窗口只要是offset定了,各窗口也就定了,不会随着处理元素的时间戳的变化而发生变化。
TumblingEventTimeWindows:
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
这里先去判断元素有没有带时间戳,所以在设定时间类型为EventTime时,要记得设置水位线
之后同样调用getWindowStartWithOffset
方法
总结:
-
在翻滚窗口中,每一个元素都只能分配到一个窗口中,所以最后返回的是 Collections.singletonList,只有一个元素的列表。
-
基于上可以看出这两种翻滚窗口差别就在于获取时间,即获取timestamp不同,一个是通过
context
调用getCurrentProcessingTime
获取当前时间;一个是通过元素带的时间戳。
2.2 滑动窗口分配器
分为SlidingProcessingTimeWindows
和SlidingEventTimeWindows
SlidingProcessingTimeWindows:
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
首先计算当前元素能够被分配到多少个窗口中,size/slide
(即窗口大小/滑动距离大小)向下取整,比如说每隔10分钟计算过去每小时的数据,那么size/slide
就是6,如此刻处理时间是20:37,那么元素被分配到的窗口(offset为0的情况)就是[19:40-20:40),[19:50-20:50),[20:00-21:00),[20:10-21:10),[20:20-21:20),[20:30-21:30);
lastStart
就是最近的窗口的(开始时间最大的)开始时间,然后使用循环添加Window。
SlidingEventTimeWindows:
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
这个与上面基于ProcessingTime的处理逻辑基本一致,除了获取时间不一样,一个是获取当前处理时间戳,一个使用元素自带的时间戳。
2.3 会话窗口分配器
会话窗口分配器涉及到窗口合并,那么什么是窗口合并?
示例:
三组数据属于K1,一组数据属于K2。
可以发现,会话窗口分配器的起始时间是当前事件时间或者处理时间,而结束时间则为在起始时间基础上后延超时间隔。
[K1,V4]数据应该是落在[K1,V1]的窗口,即[13:02,13:32]内,但是其本来的窗口是[13:20,13:50],所以会话时间延迟变为[13:02,13:50]。
四个实现类:
ProcessingTimeSessionWindows
和EventTimeSessionWindows
;
DynamicProcessingTimeSessionWindows
和
DynamicEventTimeSessionWindows
@PublicEvolving
public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
private static final long serialVersionUID = 1L;
/**
* Determines which windows (if any) should be merged.
*
* @param windows The window candidates.
* @param callback A callback that can be invoked to signal which windows should be merged.
*/
public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);
/**
* Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which
* windows should be merged.
*/
public interface MergeCallback<W> {
/**
* Specifies that the given windows should be merged into the result window.
*
* @param toBeMerged The list of windows that should be merged into one window.
* @param mergeResult The resulting merged window.
*/
void merge(Collection<W> toBeMerged, W mergeResult);
}
}
ProcessingTimeSessionWindows:
assignWindows
方法:
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
long currentProcessingTime = context.getCurrentProcessingTime();
return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}
构建一个以当前时间为起点,长度为sessionTimeout的TimeWindow单对象集合。
mergeWindows
方法,调用TimeWindow的mergeWindows方法
/**
* Merge overlapping {@link TimeWindow}s.
*/
@Override
public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
TimeWindow.mergeWindows(windows, c);
}
TimeWindow.mergeWindows
方法:
public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
// sort the windows by the start time and then merge overlapping windows
List<TimeWindow> sortedWindows = new ArrayList<>(windows);
Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
@Override
public int compare(TimeWindow o1, TimeWindow o2) {
return Long.compare(o1.getStart(), o2.getStart());
}
});
List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;
for (TimeWindow candidate: sortedWindows) {
if (currentMerge == null) {
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
} else if (currentMerge.f0.intersects(candidate)) {
currentMerge.f0 = currentMerge.f0.cover(candidate);
currentMerge.f1.add(candidate);
} else {
merged.add(currentMerge);
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
}
}
if (currentMerge != null) {
merged.add(currentMerge);
}
for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
if (m.f1.size() > 1) {
c.merge(m.f1, m.f0);
}
}
}
- 根据窗口的startTime对窗口进行排序
- 对时间上有重叠的窗口进行合并
但是真正执行mergeWindows方法实际是在MergingWindowSet类中的addWindow方法中,
看一下MergingWindowSet:
<1> 类注释说明:
<2> addWindow方法解析:
方法注释说明:
向正在运行的窗口集添加一个新的窗口,这可能会触发合并之前正在运行的窗口,在这种情况下,MergeFunction会被调用。
添加之后,会返回代表已添加窗口的窗口。如果没有合并,则可以是新窗口本身,也可以是新合并的窗口。添加元素或者调用触发器函数仅仅发生在返回的代表上, 这样,我们再也不必处理新窗口立即被另一个窗口吞没。
public W addWindow(W newWindow, MergeFunction<W> mergeFunction) throws Exception {
List<W> windows = new ArrayList<>();
windows.addAll(this.mapping.keySet());
windows.add(newWindow);
final Map<W, Collection<W>> mergeResults = new HashMap<>();
windowAssigner.mergeWindows(windows,
new MergingWindowAssigner.MergeCallback<W>() {
@Override
public void merge(Collection<W> toBeMerged, W mergeResult) {
if (LOG.isDebugEnabled()) {
LOG.debug("Merging {} into {}", toBeMerged, mergeResult);
}
mergeResults.put(mergeResult, toBeMerged);
}
});
(1) 将之前所有的窗口都加入到集合中,并且将新加入的窗口也加入到集合中去;
(2) 调用合并窗口分配器对窗口进行合并,参与合并的窗口集为之前的所有窗口,并注册回调(在回调中将合并结果及其关系加入到mergeResults
中);
W resultWindow = newWindow;
boolean mergedNewWindow = false;
// perform the merge
for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
W mergeResult = c.getKey();
Collection<W> mergedWindows = c.getValue();
// if our new window is in the merged windows make the merge result the
// result window
if (mergedWindows.remove(newWindow)) {
mergedNewWindow = true;
resultWindow = mergeResult;
}
// pick any of the merged windows and choose that window's state window
// as the state window for the merge result
W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next());
// figure out the state windows that we are merging
List<W> mergedStateWindows = new ArrayList<>();
for (W mergedWindow: mergedWindows) {
W res = this.mapping.remove(mergedWindow);
if (res != null) {
mergedStateWindows.add(res);
}
}
this.mapping.put(mergeResult, mergedStateWindow);
// don't put the target state window into the merged windows
mergedStateWindows.remove(mergedStateWindow);
// don't merge the new window itself, it never had any state associated with it
// i.e. if we are only merging one pre-existing window into itself
// without extending the pre-existing window
if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
mergeFunction.merge(mergeResult,
mergedWindows,
this.mapping.get(mergeResult),
mergedStateWindows);
}
}
// the new window created a new, self-contained window without merging
if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) {
this.mapping.put(resultWindow, resultWindow);
}
return resultWindow;
}
(3) 预置新加入的窗口为最终要返回的结果窗口,并将mergedNewWindow
初始化为false
(4) 在for循环中,获取某个合并后的窗口,以及参与合并该窗口的原始窗口集合;
如果说当前新加入的窗口在参与该合并的窗口集合中(也即当前新加入的窗口被合并),那么最终返回的窗口就是当前合并后的窗口;
(5) 选择参与本次合并的原始窗口集合中的任意一个元素将其作为当前合并后窗口的状态窗口,接着新建一个集合存储合并后的窗口的状态窗口集合;
(6) 遍历参与合并的窗口集合中的每个原始窗口,将参与合并的原始窗口从全局集合中删除,并获得其对应的状态窗口(即其本身),加入到新建的状态窗口集合中;
(7) 将合并的结果窗口以及被选中的状态窗口加入到全局集合;将将合并的结果窗口对应的状态窗口从状态窗口集合中删除
(8) 排除单一窗口本身(即参与合并的窗口集合中有且只有新加入窗口这一条记录),因为单一窗口不需要做状态合并;调用合并回调方法。
这块比较难理解,附张流程图: