Flink自定义触发器

2018-10-30  本文已影响0人  和平菌

上一篇分享中介绍了Flink完成数据统计的例子,在最后提到了自定义的统计触发器,这一篇分享主要介绍一下自定义的触发器如何来实现。

一、触发器的作用
触发器的作用就是我们在窗口中,什么时候来触发我们的聚合方法。主要涉及到的就是聚合计算(AggregateFunction)中的
OUT getResult(ACC var1);
这两个方法

比如我们想要在1个小时为单位的时间窗口里,达到每分钟来刷新数据的目的,那我们就必须每分钟都要触发一次getResult方法,来把数据发送到下一个处理节点(一般来说都是Sink-->保存数据的节点)

二、触发器的的实现
实现很简单,只需要继承Trigger<Object, W>类,实现它的方法即可
例如,我们需要一个带步长的触发器:

class ProcessTimeTrigger<W extends Window> extends Trigger<Object, W>
private final long interval;
private ProcessTimeTrigger(long interval) {
        this.interval = interval;
    }

方法调用时机如下:
onElement()方法,每个元素被添加到窗口时调用
  
onEventTime()方法,当一个已注册的事件时间计时器启动时调用
  onProcessingTime()方法,当一个已注册的处理时间计时器启动时调用
  
onMerge()方法,与状态性触发器相关,当使用会话窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
  *最后一个clear()方法执行任何需要清除的相应窗口
上面的方法中有两个需要注意的地方:
1)第一、三通过返回一个TriggerResult来决定如何操作调用他们的事件,这些操作可以是下面操作中的一个;
CONTINUE:什么也不做
FIRE:触发计算
PURGE:清除窗口中的数据
FIRE_AND_PURGE:触发计算并清除窗口中的数据

三、自定义注册定时触发器
我们在需要在onElement中注册一个定时触发的任务

@Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        timestamp = ctx.getCurrentProcessingTime();

        if (fireTimestamp.get() == null) {
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;

            ctx.registerProcessingTimeTimer(nextFireTimestamp);

            fireTimestamp.add(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }

根据步长来注册下次执行的时间

然后

@Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

        if (fireTimestamp.get().equals(time)) {
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE;
        } else if(window.maxTimestamp() == time) {
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

在onProcessingTime的时候如果步长和当前的执行时间一致,则触发计算
并再注册下一次的触发时间,直到窗口结束。

上一篇下一篇

猜你喜欢

热点阅读