Flink源码解析

Flink 自定义触发器实现带超时时间的 countAndTim

2020-07-03  本文已影响0人  shengjk1

1.背景

项目中需要自定义 trigger,需要基于两个条件:1. count 即 msg 的个数,当个数大于某个数时触发窗口 2. time 即每个固定的时间触发窗口

2.代码样例

/**
 * @author shengjk1
 * @date 2019/9/4
 */
public class CountAndTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;
    
    private final long maxCount;
    
    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
    
    public CountAndTimeTrigger(long maxCount) {
        super();
        this.maxCount = maxCount;
    }
    
    
    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE_AND_PURGE;
        }
        return TriggerResult.CONTINUE;
    }
    
    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE;
    }
    
    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }
    
    @Override
    public boolean canMerge() {
        return false;
    }
    
    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }
    }
    
    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(stateDesc).clear();
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }
    
    
    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;
        
        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读