Flink源码之Evictor
Flink 系列博客
Flink QuickStart
Flink双流操作
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任务提交操作
Flink配置Prometheus监控
Flink in docker 部署
Flink HA 部署
Flink 常见调优参数总结
Flink 源码之任务提交流程分析
Flink 源码之基本算子
Flink 源码之Trigger
Flink 源码之Evictor
Evictor
为何使用Evictor
Evictor的作用是在Flink进行计算之前移除元素。
以如下使用场景为例:stream每次进入一个元素(CountTrigger, maxCount设置为1)的时候获取最近2小时内的数据。这种情况下可以使用Flink提供的EventTimeSessionWindows
。EventTimeSessionWindows
使用EventTimeTrigger(当前时间为window的maxTimestamp的时候该trigger会被触发)。该window初始化时接受一个gap参数。多个元素依次到来,如果这些元素之间的时间间隔均不大于gap,他们会被合并至同一个window中。如果两个元素的时间间隔大于gap,则之前的window会被截断,后面的元素会进入一个新的window中。
val stream = ...
stream.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.hours(2)))
.trigger(CountTrigger.of(1))
.evictor(TimeEvictor.of(Time.hours(2)))
Flink中每个window中的数据可以简单理解为以key value的形式在HeapListState中的CopyOnWriteStateTable中存储。key为window对象本身,value为该window中的数据。window的合并运算是将window进行并集运算,同时合并value集合的内容。这样一来合并之后的window所含数据很可能存在2小时之前的数据。在计算前排除他们我们需要evictor来帮忙。在这个例子中,我们用TimeEvictor将2小时之前的老数据清理出去。
public interface Evictor<T, W extends Window> extends Serializable {
/**
* Optionally evicts elements. Called before windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
// 在计算操作执行前执行evict操作
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* Optionally evicts elements. Called after windowing function.
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
// 在计算操作执行完后执行evict操作
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* A context object that is given to {@link Evictor} methods.
*/
interface EvictorContext {
/**
* Returns the current processing time.
*/
long getCurrentProcessingTime();
/**
* Returns the metric group for this {@link Evictor}. This is the same metric
* group that would be returned from {@link RuntimeContext#getMetricGroup()} in a user
* function.
*
* <p>You must not call methods that create metric objects
* (such as {@link MetricGroup#counter(int)} multiple times but instead call once
* and store the metric object in a field.
*/
MetricGroup getMetricGroup();
/**
* Returns the current watermark time.
*/
long getCurrentWatermark();
}
}
通常来说操作window的类为WindowOperator,它并没有使用evictor。一旦把window指定Evictor,该window会由EvictWindowOperator类来负责操作。
evictBefore和eviceAfter的调用时间点如下:
// EvictWindowOperator.java
private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
// Work around type system restrictions...
FluentIterable<TimestampedValue<IN>> recordsWithTimestamp = FluentIterable
.from(contents)
.transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>() {
@Override
public TimestampedValue<IN> apply(StreamRecord<IN> input) {
return TimestampedValue.from(input);
}
});
// 后续处理逻辑之前调用evictorBefore
evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
FluentIterable<IN> projectedContents = recordsWithTimestamp
.transform(new Function<TimestampedValue<IN>, IN>() {
@Override
public IN apply(TimestampedValue<IN> input) {
return input.getValue();
}
});
processContext.window = triggerContext.window;
// 后续处理逻辑
userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
// 后续处理逻辑之后调用evictorAfter
evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
//work around to fix FLINK-4369, remove the evicted elements from the windowState.
//this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState.
windowState.clear();
for (TimestampedValue<IN> record : recordsWithTimestamp) {
windowState.add(record.getStreamRecord());
}
}
Evictor在Flink中有如下3个实现
- TimeEvictor
- CountEvictor
- DeltaEvictor
TimeEvictor
以时间为判断标准,决定元素是否会被移除。
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
// 如果element没有timestamp,直接返回
if (!hasTimestamp(elements)) {
return;
}
// 获取elements中最大的时间戳(到来最晚的元素的时间)
long currentTime = getMaxTimestamp(elements);
// 截止时间为: 到来最晚的元素的时间 - 窗口大小(可以理解为保留最近的多久的元素)
long evictCutoff = currentTime - windowSize;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
// 清除所有时间戳小于截止时间的元素
if (record.getTimestamp() <= evictCutoff) {
iterator.remove();
}
}
}
CountEvictor
以元素计数为标准,决定元素是否会被移除。
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (size <= maxCount) {
// 小于最大数量,不做处理
return;
} else {
int evictedCount = 0;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
iterator.next();
evictedCount++;
if (evictedCount > size - maxCount) {
break;
} else {
// 移除前size - maxCount个元素,只剩下最后maxCount个元素
iterator.remove();
}
}
}
}
DeltaEvictor
DeltaEvictor通过计算DeltaFunction的值(依次传入每个元素和最后一个元素),并将其与threshold进行对比,如果函数计算结果大于等于threshold,则该元素会被移除。
evict方法如下:
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
// 获取最后一个元素
TimestampedValue<T> lastElement = Iterables.getLast(elements);
for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
TimestampedValue<T> element = iterator.next();
// 依次计算每个元素和最后一个元素的delta值,同时和threshold的值进行比较
// 若计算结果大于threshold值或者是相等,则该元素会被移除
if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
iterator.remove();
}
}
}