flink

window源码

2018-06-12  本文已影响79人  edd72e125c98

WindowOperator

Context

定义了各种 registerEventTimeTimer,registerEventTimeTimer还有deleteXXX, 其本质是调用internalTimerService的同名方法来增删timer

提供onElement,onProcessingTime,onEventTime给上层(WindowOperator call), 内部就是包装了trigger的同名方法, trigger的方法由业务实现

HeapInternalTimerService

HeapInternalTimerService :> InternalTimerService, 主要就是维护两个queue,eventTimeTimersQueue和eventTimeTimersQueue
这两个queue的内部元素是timer

windowOperator implement了OneInputStreamOperator

  1. processElement() , 主要做了把element存到partitioned state里(给onEventTime和onProcessTime或自己去消费),再根据其中call context.onElement的结果来决定fire来消费
    首先用windowAssigner找到element对应的windows
    Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

mergeWindow逻辑跳过。。。直接看如何处理

for (W window: elementWindows) {

                // drop if the window is already late, 处理迟到数据
                if (isLate(window)) {
                    continue;
                }
                        // 找到每个窗口对应的partitionedState并存入
            AppendingState<IN, ACC> windowState =
                        getPartitionedState(window, windowSerializer, windowStateDescriptor);
                windowState.add(element.getValue());
                      // 更新context
                context.key = key;
                context.window = window;
                      // call onElement
                TriggerResult triggerResult = context.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                  //执行window function  
                    emitWindowContents(window, contents);
                }

                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                            // 注册cleanup timer, 会在onEventTime和onProcessTime中读到这个timer并clean
                registerCleanupTimer(window);
            }
  1. processWatermark() 这个方法在一个父类的父类 AbstractUdfStreamOperator中实现了,自己没实现。
    这个方法call了timerService的advanceWatermark, 其中timerService call了 WindowOperatoronEventTime()
public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;

        InternalTimer<K, N> timer;

//不大于,水印用于表示小于该时间戳的元素都已到达,所以所有不大于水印的触发时间戳都该被触发
        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {

            Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
            timerSet.remove(timer);
            eventTimeTimersQueue.remove();

            keyContext.setCurrentKey(timer.getKey());
//这边调用了onEventTime
            triggerTarget.onEventTime(timer);
        }

processWatermark在OperatorChain.ChainningOutput.emitWatermark中被call。

windowOperator implement了triggerable,2方法onEventTime()onProcessingTime()。都是处理timer的
1.onEventTime(InternalTimer timer)在HeapInternalTimerService。advanceWatermark中调用(触发watermark以下的timer时)。
onEventTime内部 callcontext.onEventTime()(触发trigger.onEventTime) 来决定触发window计算, 和执行cleanup window的逻辑cleanAllState()

2.onProcessingTime(InternalTimer timer) call context.onProcessingTime
和onEventTime类似, 触发trigger.onProcessTime和cleanAllState
上层HeapInternalTimerService.onProcessTime call windowOperator.onProcessTime()

//不大于表示小于该时间戳的元素都已到达,所以所有不大于此times的触发时间戳timer都该被触发
        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {

            Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);

            timerSet.remove(timer);
            processingTimeTimersQueue.remove();

            keyContext.setCurrentKey(timer.getKey());
// triggerTarget  就是 windowOperator
            triggerTarget.onProcessingTime(timer);
        }
EvictingWindowOperator

EvictingWindowOperator :> WindowOperator

在计算前写call evictor.evictorBefore

AbstractKeyedTimePanes

优化window到pane里, 但目前deprecate, 未来会重做
WindowedStream reduce()apply()(fold不支持)中 createFastTimeOperatorIfValid()
根据
if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class(or TumblingAlignedProcessingTimeWindows) && trigger == null && evictor == null)来判断是否可以使用带pane优化的operator (AccumulatingProcessingTimeWindowOperatorAggregatingProcessingTimeWindowOperator

否则, 使用简单的 windowOperator和EvitorWindowOperator

上一篇 下一篇

猜你喜欢

热点阅读