Flink专题

Flink EventTime和watermarks触发机制以及

2019-04-04  本文已影响0人  尼小摩

针对数据乱序的需求,需要使用eventtime和watermark来解决。

watermarks的生成方式有两种:

第一种方式比较常用,本文主要针对Periodic Watermarks进行分析。

参照官网文档中With Periodic Watermarks的使用方法:

官网With Periodic Watermarks

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 seconds

    var currentMaxTimestamp: Long = _

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }
}

代码中的extractTimestamp方法是从数据本身中提取EventTime
getCurrentWatermar方法是获取当前水位线,利用currentMaxTimestamp - maxOutOfOrderness
maxOutOfOrderness表示是允许数据的最大乱序时间

所以在这里我们使用的话也实现接口AssignerWithPeriodicWatermarks。

watermark代码实现

从socket模拟接收数据,然后使用map进行处理,后面再调用assignTimestampsAndWatermarks方法抽取timestamp并生成watermark。最后再调用window打印信息来验证window被触发的时机。

Flink主程序代码:

package com.ly.jtbi

import java.text.SimpleDateFormat

import com.ly.jtbi.wk.StreamingPeriodicWatermark
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.flink.api.java.utils.ParameterTool

/**
  * @Auther: fc.w
  * @Date: 2019/4/4
  */
object StreamingWindowWatermark {

  def main(args: Array[String]): Unit = {
    var port: Int = 0
    var hostname: String = ""

    try {
      val parameterTool = ParameterTool.fromArgs(args)
      hostname = parameterTool.get("hostname")
      port = parameterTool.getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("USAGE: \n StreamingWindowWatermark <hostname> <host>")
        System.exit(1)
      }
    }

    // 获取Flink执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置使用eventtime,默认是使用processtime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 启用checkpoint
    env.enableCheckpointing(1000)
    // 设置Exactly_once
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 设置并行度 1
    env.setParallelism(1)

    val input = env.socketTextStream(hostname, port)

    // 解析输入的数据
    val dataDStream = input.map(record => {
      val arr = record.split(",")
      (arr(0), arr(1).toLong)
    })

    // 抽取timestamp 和 watermark
   val waterMarkStream = dataDStream.assignTimestampsAndWatermarks(StreamingPeriodicWatermark)

    // 保存被丢弃的乱序数据
    val outputTag = new OutputTag[(String, Long)]("late-data")
    val window = waterMarkStream
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(3))) // 按照消息的EventTime分配窗口,和调用TimeWindow效果一样
      .allowedLateness(Time.seconds(2)) // 允许延迟2s
      .sideOutputLateData(outputTag)
      .apply(new WindowFunction[(String, Long), String, Tuple, TimeWindow]() {
      override def apply(tuple: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
        val key = tuple.toString
        val arrarList = input.toList.sortBy(_._2)
        val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
        val result = key + "," + arrarList.size + "," + sdf.format(arrarList(0)) + "," +
          "" + sdf.format(arrarList(arrarList.size - 1)) + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd())

        out.collect(result)
      }
    })

    // 把延迟的数据暂时打印到控制台,实际可以保存到存储介质中。
    val sideOutput = window.getSideOutput(outputTag)
    sideOutput.print()

    window.print()

    // 因为flink是懒加载的,所以必须调用execute方法才会执行上面的代码
    env.execute("eventtime-watermark")
  }

}

StreamingPeriodicWatermark代码实现

package com.ly.jtbi.wk

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark

/**
  * @Auther: fc.w
  * @Date: 2019/4/4
  */
object StreamingPeriodicWatermark extends AssignerWithPeriodicWatermarks[(String, Long)]{

  var currentMaxTimestamp = 0L
  val maxOutOfOrderness = 10000L // 最大允许的乱序时间是10s

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

  /**
    * 定义生成watermark的逻辑
    * @return
    */
  override def getCurrentWatermark: Watermark = {
    new Watermark(currentMaxTimestamp - maxOutOfOrderness)
  }

  /**
    * 定义如何提取timestamp
    * @param element
    * @param previousElementTimestamp
    * @return
    */
  override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
    val timestamp = element._2
    currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp)
    println("key:" + element._1 + ",eventtime:[" + element._1 + "|" + sdf.format(element._2) + "], currentMaxTimestamp:["+currentMaxTimestamp + "|" +
      sdf.format(currentMaxTimestamp) + "], watermark:[" + getCurrentWatermark().getTimestamp() + "|" + sdf.format(getCurrentWatermark().getTimestamp()) + "]")

    timestamp
  }


}

执行流程:

  1. 接收socket数据。
  2. 通过map算子对每行数据按照逗号分隔并转换成(String,Long) Tuple类型。其中Tuple中的第一个元素代表具体的数据,第二行代表数据的eventTime。
  3. 提取timestamp,生成watermarks,允许的最大乱序时间是10s,并打印(key, eventtime, currentMaxTimestamp, watermark)等信息。
  4. 根据第一个元素分组聚合,window窗口大小为3秒,输出(key,窗口内元素个数,窗口内最早元素的时间,窗口内最晚元素的时间,窗口自身开始时间,窗口自身结束时间)。

watermark的触发时机:

  1. 通过watermark和timestamp的时间,分析输出来的数据的定window的触发时机。

通过socket出入第一条数据:

$ 0001,1554363502000

程序输出:

$ key:0001,eventtime:[1554363502000|2019-04-04 15:38:22.000], currentMaxTimestamp:[1554363502000|2019-04-04 15:38:22.000], watermark:[1554363492000|2019-04-04 15:38:12.000]

为了方便查看,把输入内容汇总到表格中

Key Event Time CurrentMaxTimeStamp WaterMark
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000

此时,wartermark的时间,已经落后于currentMaxTimestamp10秒了。我们继续输入:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000

继续输入:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000

到这里,window仍然没有被触发,此时watermark的时间已经等于了第一条数据的Event Time了。那么window到底什么时候被触发呢?

继续输入:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000

window仍然没有触发,此时,我们的数据已经发到2018-10-01 10:11:33.000了,根据eventtime来算,最早的数据已经过去了11秒了,window还没有开始计算,那到底什么时候会触发window呢?

再增加一秒:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000
0001 1554363514000

2019-04-04 15:38:34.000
1554363514000

2019-04-04 15:38:34.000
1554363504000

2019-04-04 15:38:24.000
[10:11:21.000 10:11:24.000)

到这里,我们做一个说明:
window的触发机制,是先按照自然时间将window划分,如果window大小是3秒,那么1分钟内会把window划分为如下的形式【左闭右开】:


[00:00:00,00:00:03)
[00:00:03,00:00:06)
[00:00:06,00:00:09)
[00:00:09,00:00:12)
···

[00:00:54,00:00:57)
[00:00:57,00:01:00)
···

window的设定无关数据本身,而是系统定义好了的。

输入的数据中,根据自身的Event Time,将数据划分到不同的window中,如果window中有数据,则当watermark时间 >= Event Time时,就符合了window触发的条件了,最终决定window触发,还是由数据本身的Event Time所属的window中的window_end_time决定。

上面的测试中,最后一条数据到达后,其水位线已经升至10:11:24秒,正好是最早的一条记录所在window的window_end_time,所以window就被触发了。

为了验证window的触发机制,我们继续输入数据:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000
0001 1554363514000

2019-04-04 15:38:34.000
1554363514000

2019-04-04 15:38:34.000
1554363504000

2019-04-04 15:38:24.000
[10:11:21.000 10:11:24.000)
0001 1554363516000

2019-04-04 15:38:36.000
1554363516000

2019-04-04 15:38:36.000
1554363506000

2019-04-04 15:38:26.000

此时,watermark时间虽然已经达到了第二条数据的时间,但是由于其没有达到第二条数据所在window的结束时间,所以window并没有被触发。那么,第二条数据所在的window时间是:

[00:00:24,00:00:27)

也就是说,我们必须输入一个15:38:27秒的数据,第二条数据所在的window才会被触发。我们继续输入:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000
0001 1554363514000

2019-04-04 15:38:34.000
1554363514000

2019-04-04 15:38:34.000
1554363504000

2019-04-04 15:38:24.000
[15:38:21.000 15:38:24.000)
0001 1554363516000

2019-04-04 15:38:36.000
1554363516000

2019-04-04 15:38:36.000
1554363506000

2019-04-04 15:38:26.000
0001 1554363517000

2019-04-04 15:38:37.000
1554363517000

2019-04-04 15:38:37.000
1554363507000

2019-04-04 15:38:27.000
[15:38:24.000 15:38:27.000)

此时,window的触发要符合以下几个条件:

  1. watermark时间 >= window_end_time
    2. 在[window_start_time,window_end_time)区间中有数据存在,注意是左闭右开的区间

同时满足了以上2个条件,window才会触发。

watermark+window处理乱序数据

上面的测试,数据都是按照时间顺序递增的,现在,我们输入一些乱序的(late)数据,看看watermark结合window机制,是如何处理乱序的。

输入两行数据:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000
0001,1554363517000
0001,1554363519000
0001,1554363511000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000
0001 1554363514000

2019-04-04 15:38:34.000
1554363514000

2019-04-04 15:38:34.000
1554363504000

2019-04-04 15:38:24.000
[15:38:21.000 15:38:24.000)
0001 1554363516000

2019-04-04 15:38:36.000
1554363516000

2019-04-04 15:38:36.000
1554363506000

2019-04-04 15:38:26.000
0001 1554363517000

2019-04-04 15:38:37.000
1554363517000

2019-04-04 15:38:37.000
1554363507000

2019-04-04 15:38:27.000
[15:38:24.000 15:38:27.000)
0001 1554363519000

2019-04-04 15:38:39.000
1554363517000

2019-04-04 15:38:39.000
1554363509000

2019-04-04 15:38:29.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363517000

2019-04-04 15:38:39.000
1554363509000

2019-04-04 15:38:29.000

可以看到,虽然我们输入了一个15:38:31的数据,但是currentMaxTimestamp和watermark都没变。此时,按照我们上面提到的公式:

  1. watermark时间 >= window_end_time
    2. 在[window_start_time,window_end_time)区间中有数据存在,注意是左闭右开的区间

watermark时间(15:38:29) < window_end_time(15:38:33),因此不能触发window。

那如果我们再次输入一条15:38:43的数据,此时watermark时间会升高到15:38:33,这时的window一定就会触发了,我们试一试:
输入:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000
0001,1554363517000
0001,1554363519000
0001,1554363511000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000
0001 1554363514000

2019-04-04 15:38:34.000
1554363514000

2019-04-04 15:38:34.000
1554363504000

2019-04-04 15:38:24.000
[15:38:21.000 15:38:24.000)
0001 1554363516000

2019-04-04 15:38:36.000
1554363516000

2019-04-04 15:38:36.000
1554363506000

2019-04-04 15:38:26.000
0001 1554363517000

2019-04-04 15:38:37.000
1554363517000

2019-04-04 15:38:37.000
1554363507000

2019-04-04 15:38:27.000
[15:38:24.000 15:38:27.000)
0001 1554363519000

2019-04-04 15:38:39.000
1554363517000

2019-04-04 15:38:39.000
1554363509000

2019-04-04 15:38:29.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363517000

2019-04-04 15:38:39.000
1554363509000

2019-04-04 15:38:29.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:30.000 15:38:33.000)

这里可以看到,窗口中有2个数据,15:38:31和15:38:32,但是没有15:38:33的数据,原因是窗口是一个前闭后开的区间,15:38:33的数据是属于[15:38:33,15:38:36)的窗口的。

Flink应该如何设置最大乱序时间 ?

这个要结合自己的业务以及数据情况去设置。如果maxOutOfOrderness设置的太小,而自身数据发送时由于网络等原因导致乱序或者late太多,那么最终的结果就是会有很多单条的数据在window中被触发,数据的正确性影响太大
对于严重乱序的数据,需要严格统计数据最大延迟时间,才能保证计算的数据准确,延时设置太小会影响数据准确性,延时设置太大不仅影响数据的实时性,更加会加重Flink作业的负担,不是对eventTime要求特别严格的数据,尽量不要采用eventTime方式来处理,会有丢数据的风险。

上边的结果,已经表明,对于out-of-order的数据,Flink可以通过watermark机制结合window的操作,来处理一定范围内的乱序数据。那么对于“迟到(late element)”太多的数据,Flink是怎么处理的呢?

late element(延迟数据)的处理

延迟数据的三种处理方案:

1. 丢弃(默认)

我们输入一个乱序很多的(其实只要Event Time < watermark时间)数据来测试下:
输入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)

注意:此时watermark是2019-04-04 15:38:33.000

下面我们再输入几个eventtime小于watermark的时间
输入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000

注意:此时并没有触发window。因为输入的数据所在的窗口已经执行过了,flink默认对这些迟到的数据的处理方案就是丢弃。

2. allowedLateness 指定允许数据延迟的时间

在某些情况下,我们希望对迟到的数据再提供一个宽容的时间。

Flink提供了allowedLateness方法可以实现对迟到的数据设置一个延迟时间,在指定延迟时间内到达的数据还是可以触发window执行的。
输入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)

正常触发window,没什么问题。
此时watermark是2019-04-04 15:38:33.000

那么现在我们输入几条eventtime<watermark的数据验证一下效果
输入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)

我们再输入一条数据,把water调整到10:11:34
输入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000
0001,1554363524000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363524000

2019-04-04 15:38:44.000
1554363524000

2019-04-04 15:38:44.000
1554363514000

2019-04-04 15:38:34.000

此时,把water上升到了15:38:34,我们再输入几条eventtime<watermark的数据验证一下效果

输入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000
0001,1554363524000
0001,1554363510000
0001,1554363511000
0001,1554363513000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363524000

2019-04-04 15:38:44.000
1554363524000

2019-04-04 15:38:44.000
1554363514000

2019-04-04 15:38:34.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)

发现输入的三行数据都触发了window的执行。

我们再输入一条数据,把water调整到15:38:35
输入:

$nc -l -p 9000
0001,1554363525000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363525000

2019-04-04 15:38:45.000
1554363525000

2019-04-04 15:38:45.000
1554363515000

2019-04-04 15:38:35.000

此时,watermark上升到了15:38:35

我们再输入几条eventtime<watermark的数据验证一下效果
输入:

$nc -l -p 9000
0001,1554363525000
0001,1554363510000
0001,1554363511000
0001,1554363513000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363525000

2019-04-04 15:38:45.000
1554363525000

2019-04-04 15:38:45.000
1554363515000

2019-04-04 15:38:35.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000

发现这几条数据都没有触发window。

分析:

当watemark等于15:38:33的时候,正好是window_end_time,所以会触发[15:38:30~15:38:33) 的window执行。

当窗口执行过后,我们输入[15:38:30~15:38:33) window内的数据会发现window是可以被触发的。

当watemark提升到15:38:34的时候,我们输入[15:38:30~15:38:33)window内的数据会发现window也是可以被触发的。

当watemark提升到15:38:35的时候,我们输入[15:38:30~15:38:33)window内的数据会发现window不会被触发了。

由于我们在前面设置了allowedLateness(Time.seconds(2)),可以允许延迟在2s内的数据继续触发window执行。

所以当watermark是15:38:34的时候可以触发window,但是15:38:35的时候就不行了。

总结:

对于此窗口而言,允许2秒的迟到数据,即第一次触发是在watermark >= window_end_time时
第二次(或多次)触发的条件是watermark < window_end_time + allowedLateness时间内,这个窗口有late数据到达时。

解释:

当watermark等于15:38:34的时候,我们输入eventtime为15:38:30、15:38:31、15:38:32的数据的时候,是可以触发的,因为这些数据的window_end_time都是15:38:33,也就是15:38:34<15:38:33+2 为true。

但是当watermark等于15:38:35的时候,我们再输入eventtime为15:38:30、15:38:31、15:38:32的数据的时候,这些数据的window_end_time都是15:38:33,此时,15:38:35<15:38:33+2 为false了。所以最终这些数据迟到的时间太久了,就不会再触发window执行了。

3. sideOutputLateData 收集迟到的数据

通过sideOutputLateData 可以把迟到的数据统一收集,统计存储,方便后期排查问题。



输入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)

此时,window被触发执行了,此时watermark是15:38:33

下面我们再输入几个eventtime小于watermark的数据测试一下
输入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000

程序输出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000

此时,针对这几条迟到的数据,都通过sideOutputLateData保存到了outputTag中,然后输出到控制台。

    // 把延迟的数据暂时打印到控制台,实际可以保存到存储介质中。
    val sideOutput = window.getSideOutput(outputTag)
    sideOutput.print()
上一篇下一篇

猜你喜欢

热点阅读