基于Apache Flink的流处理 第六章 基于时间和窗口的算

2021-09-21  本文已影响0人  kaiker

1、配置时间特性

默认情况下是处理时间,设置其他时间特性使用

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

1.1 分配时间戳和生成水位线

周期性水位分配器

env.getConfig().setAutoWatermarkInterval(100);

DataStream<SensorReading> dataStream = inputStream.map(line -> {
                    String[] fields = line.split(",");
                    return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
                })

                // 乱序数据设置时间戳和watermark
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {

                    private final long maxOutBoundary = 3 * 1000L;
                    private long currentMaxTimestamp = Integer.MIN_VALUE;

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(currentMaxTimestamp - maxOutBoundary);
                    }

                    @Override
                    public long extractTimestamp(SensorReading sensorReading, long l) {
                        //获取当前记录的时间戳
                        long currentTs = sensorReading.getTimestamp() * 1000L;
                        // 更新最大的时间戳
                        currentMaxTimestamp = Math.max(currentMaxTimestamp, currentTs);
                        // 返回记录的时间戳
                        return currentTs;
                    }
                });

定点水位线分配器

https://www.jianshu.com/p/e6c7957d76d9

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
        if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
}

1.2 水位线、延迟及完整性

2、处理函数

2.1 处理函数接口

dataStream.keyBy("id")
                .process( new MyProcess() )
                .print();

public static class MyProcess extends KeyedProcessFunction<Tuple, SensorReading, Integer> {
        ValueState<Long> tsTimerState;

        @Override
        public void open(Configuration parameters) throws Exception {
            tsTimerState =  getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts-timer", Long.class));
        }

        @Override
        public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
            out.collect(value.getId().length());

            // context
            // Timestamp of the element currently being processed or timestamp of a firing timer.
            ctx.timestamp();
            // Get key of the element being processed.
            ctx.getCurrentKey();
            //            ctx.output();
            ctx.timerService().currentProcessingTime();
            ctx.timerService().currentWatermark();
            // 在5处理时间的5秒延迟后触发
            ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 5000L);
            tsTimerState.update(ctx.timerService().currentProcessingTime() + 1000L);
            //            ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
            // 删除指定时间触发的定时器
            //            ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value());
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
            System.out.println(timestamp + " 定时器触发");
            ctx.getCurrentKey();
            //            ctx.output();
            ctx.timeDomain();
        }

        @Override
        public void close() throws Exception {
            tsTimerState.clear();
        }
    }

2.2 时间服务和计时器

2.3 副输出

SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
      @Override
      public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
        // 判断温度,大于30度,高温流输出到主流;小于低温流输出到侧输出流
        if (value.getTemperature() > 30) {
          out.collect(value);
        } else {
          ctx.output(lowTempTag, value);
        }
      }
    });

highTempStream.getSideOutput(lowTempTag).print("low-temp");

3、窗口算子

3.1 滚动窗口

滚动窗口不会重叠,滑动窗口可以重叠,会话窗口是根据没有收到信息的间隔来划定窗口。滚动窗口比较常用。https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_622-timewindow

这里使用.window就可以创建默认窗口了,类型为TimeWindow。

DataStream<Integer> resultStream = dataStream.keyBy("id")
                .window(TumblingProcessingTimeWindows.of(Time.seconds(15)))

3.2 在窗口上应用的函数

aggregateFunction

DataStream<Integer> resultStream = dataStream.keyBy("id")
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {

                    // 新建的累加器
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    // 每个数据在上次的基础上累加
                    @Override
                    public Integer add(SensorReading value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    // 返回结果值
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    // 分区合并结果(TimeWindow一般用不到,SessionWindow可能需要考虑合并)
                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                });

ProcessWindowFunction

SingleOutputStreamOperator<SensorReading> minTempStream = keyStream
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
                    private final long maxOutBoundary = 4 * 1000L;
                    private long currentMaxTimestamp = Integer.MIN_VALUE;

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(currentMaxTimestamp - maxOutBoundary);
                    }

                    @Override
                    public long extractTimestamp(SensorReading sensorReading, long l) {
                        //获取当前记录的时间戳
                        long currentTs = sensorReading.getTimestamp() * 1000L;
                        // 更新最大的时间戳
                        currentMaxTimestamp = Math.max(currentMaxTimestamp, currentTs);
                        // 返回记录的时间戳
                        return currentTs;
                    }
                })
                .keyBy(SensorReading::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .process(new ProcessWindowFunction<SensorReading, SensorReading, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<SensorReading, SensorReading, String, TimeWindow>.Context context, Iterable<SensorReading> iterable, Collector<SensorReading> collector) throws Exception {
                        System.out.println(context.currentWatermark());
                    }
                });

3.3 自定义窗口算子

窗口相关组件流程

分配器 windowAssigner

new WindowAssigner<SensorReading, Window>() {
                    
    @Override
     public Collection<Window> assignWindows(SensorReading sensorReading, long l, WindowAssignerContext windowAssignerContext) {
         // long starttime = l - (l % windowsize);
         // long endtime = starttime + endtime;
         // collector.singletonList(new TimeWindow(starttime, endtime));
     }

     @Override
     public Trigger<SensorReading, Window> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {
         return EventTimeTrigger.create();
     }

     @Override
     public TypeSerializer<Window> getWindowSerializer(ExecutionConfig executionConfig) {
          return null;
      }

      @Override
      public boolean isEventTime() {
          return false;
      }
}

触发器

SingleOutputStreamOperator<PageViewCount> uvStream = dataStream
      .filter(data -> "pv".equals(data.getBehavior()))
      .timeWindowAll(Time.hours(1))
      .trigger(new MyTrigger())
      .process(new UvCountResultWithBloomFliter());

public static class MyTrigger extends Trigger<UserBehavior, TimeWindow> {
    @Override
    public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
      // 每一条数据来到,直接触发窗口计算,并且直接清空窗口
      return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
    }
  }

移除器

new Evictor<SensorReading, TimeWindow>() {
                    @Override
                    public void evictBefore(Iterable<TimestampedValue<SensorReading>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
                        
                    }

                    @Override
                    public void evictAfter(Iterable<TimestampedValue<SensorReading>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {

                    }
                }

4、基于时间的双流Join

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });
input1.join(input2)
.where() // 左流限制条件
.equalTo() // 右流限制条件
.window()
.apply() // 具体join的joinFunction

5、处理迟到的数据

5.1 用副输出收集迟到的数据

OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
    };

    // 基于事件时间的开窗聚合,统计15秒内温度的最小值
    SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
      .timeWindow(Time.seconds(15))
      .sideOutputLateData(outputTag)
      .minBy("temperature");

    minTempStream.print("minTemp");
    minTempStream.getSideOutput(outputTag).print("late");

    env.execute();

5.2 基于迟到数据更新结果

https://blog.csdn.net/lmalds/article/details/55259718

    SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
      .timeWindow(Time.seconds(15))
      .allowedLateness(Time.minutes(1))
      .sideOutputLateData(outputTag)
      .minBy("temperature");
上一篇下一篇

猜你喜欢

热点阅读