第六章 Flink中的时间和窗口

2022-11-26  本文已影响0人  井底蛙蛙呱呱呱

时间语义


上图是数据流式处理过程,涉及到两个重要的时间点:事件时间(Event Time)和处理时间(Processing Time)。

我们在处理数据时,以哪种时间作为衡量标准,就是所谓的时间语义问题(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有滞后。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。

水位线(Watermark)

我们把时钟也数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接到广播下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似流水中用来当做标志的几号,在Flink中,这种用来衡量事件时间(Event Time)进展的标记,就被称作水位线(Watermark)。

水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而他插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。


理想的水位线是有序的,但是现实中由于不可控因素常常会有少量乱序的数据。




周期性生成时间戳,保存区间最大值

水位线代表当前事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢失数据,这一点对于乱序流的正确处理非常重要。水位线的特性:

水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

如何生成水位线

在生成水位线的时候,如果希望计算结果更准确,可以将水位线延迟设置得更高一些,等待时间越长,越不容易漏掉数据,但是这样时效性降低了。而如果将等待时间设置过短则会遗漏掉部分数据,虽然Flink提供了处理迟到数据的方法,但是需要分开处理。因此如何设置延迟是一个需要根据实际情况权衡的问题。

在Flink的DataStream API中,有一个单独用于生成水位线的方法:assignTimestampAndWatermarks(),他主要用来为流中的数据分配时间戳,并生成水位线来显示时间。该方法需要传入一个WatermarkStrategy作为参数,WatermarkStrategy 中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator:

代码

Flink提供了内置的水位线生成器:

代码
自定义水位线

在WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了。不同策略的关键在于WatermarkGenerator的实现。整体来说,Flink有两种不同的生产水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated):

此外,也可以在自定义数据源中发送水位线,但是这样就不能使用assignTimestampsAndWatermarks 方法来生成水位线了,两者只能二选一。

水位线的传递

在“重分区”(redistributing)的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,所以同一时刻发给下游任务的水位线可能并不相同。这说明上游各个分区处理得有快有慢,进度各不相同,这时我们应该以最慢的那个时钟,也就是最小的那个水位线为准。

窗口(Window)

Flink是一种流式计算引擎,主要是用来处理无界数据流的。想要更加方便的处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(window)。在Flink中,窗口就是用来处理无界流的核心。

由于存在迟到数据的问题,将窗口视为一个框可能并不是最合适的。我们可以把它理解成一个“桶”(bucket):每个数据都会分发到对应的桶中,当到达窗口的结束时间时,就对每个桶中收集的数据进行计算处理。


窗口的分类

窗口API概览

在定义窗口操作之前,需要先确定到底是基于按键分区的数据流KeyedStream还是在没有按键分区的DataStream上面开窗。也即调用窗口算子之前是否有keyBy操作。

而在API上面的区别也是非常小:

// 按键分区
stream.keyBy(...).window(...)

// 非按键分区
stream.windowAll(...)

窗口分配器(Window Assigner)

定义窗口分配器(Window Assigner)是构建窗口算子的第一步,他的作用就是定义数据应该被分配到哪个窗口。通过向上一节中的window/windowAll函数中传入WindowAssigner参数,返回WindowStream。

不同窗口类型有不同的窗口分配器。

1、时间窗口
// 滚动处理时间窗口
stream.keyBy(...)
  .window(TumblingProcessingTimeWindows.of(Time.seconds(5))
  .aggregate(...)

// 滑动处理时间窗口
stream.keyBy(...)
  .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))
  .aggregate(...)

// 处理时间会话窗口
stream.keyBy(...)
  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
  .aggregate(...)

// 滚动事件时间窗口
stream.keyBy(...)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .aggregate(...)

// 滑动事件时间窗口
stream.keyBy(...)
  .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
  .aggregate(...)

// 事件时间会话窗口
stream.keyBy(...)
  .window(EventTimeSessionWindows.WithGap(Time.seconds(10)))
  .aggregate(...)

// 2、计数窗口
// 滚动计数窗口, 定义一个长度为10的滚动计数窗口
stream.keyBy(...)
  .countWindow(10)

// 滑动计数窗口,长度为10,步长为3
stream.keyBy(...)
  .countWindow(10, 3)

// 3、全局窗口, 全局窗口必须自行定义触发器才能实现窗口计算,否则起不到任何作用
stream.keyBy(...)
  .window(GlobalWindows.create())

窗口函数(Window Functions)

在上面定义了窗口分配器,我们只是知道了数据属于哪个窗口,而本节介绍的窗口函数则是如何将这些窗口中的数据收集起来,即如何处理。

窗口函数是作用在windowStream上面的,返回的是DataStream。各种stream间的转换如下:


1、增量聚合函数
为了提高实时性,我们可以像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。

典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。

ReduceFunction:

package com.whu.chapter06

import com.whu.chapter05.{ClickSource, Event}

import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows

object WindowFunctionDemo {
  def main(args:Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    env.addSource(new ClickSource())
      // 数据源中的时间戳是单调递增的,所以使用下面的方法,只需要抽取时间戳就好了
      // 等同于最大延迟时间是0毫秒
      .assignAscendingTimestamps(_.timeStamp)
      .map(r => (r.user, 1L))
      // 使用用户名对数据流进行分组
      .keyBy(_._1)
      // 设置5秒钟的滚动事件时间窗口
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      // 保留第一个字段,针对第二个字段进行聚合
      .reduce((r1, r2) => (r1._1, r1._2+r2._2))
      .print()
    
    env.execute()
  }
}

AggregateFunction
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。

AggregateFunction 在源码中的定义如下:

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
{
  // 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次
  ACC createAccumulator();

  // 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程
  ACC add(IN value, ACC accumulator);

  // getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出
  OUT getResult(ACC accumulator);

  // 合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用
  ACC merge(ACC a, ACC b);
}

AggregateFunction接受3个数据类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

    env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)
      // 通过为每条数据分配相同的key,来将数据发送到同一个分区
      .keyBy(_ => "key")
      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
      .aggregate(new AvgPv)

    env.execute()
  
  class AvgPv extends AggregateFunction[Event,(Set[String], Double), Double] {
    // 创建空累加器,类型是元组,元组的第一个元素类型为Set数据结构,用来对用户名去重
    // 第二个元素用来累加pv操作,也就是没来一条数据就加一
    override def createAccumulator(): (Set[String], Double) = (Set[String](), 0L)

    // 累加规则
    override def add(in: Event, acc: (Set[String], Double)): (Set[String], Double) = {
      (acc._1+in.user, acc._2+1)
    }

    // 获取窗口关闭时向下游发送的结果
    override def getResult(acc: (Set[String], Double)): Double = {
      acc._2/(acc._1.size.toDouble)
    }

    // merge方法只有在事件时间的会话窗口时,才需要实现,这里无需实现
    override def merge(acc: (Set[String], Double), acc1: (Set[String], Double)): (Set[String], Double) = ???
  }

全窗口函数(Full Window Functions)
窗口操作中的另一大类就是全窗口函数,与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

Flink中的全窗口函数有两种:WindowFunction和ProcessWindowFunction。

// 窗口函数
stream.keyBy(<key selector>)
  .window(<window assigner>)
  .apply(new MyWindowFunction())

处理窗口函数ProcessWindowFunction是Window API中最底层的通用窗口函数接口。除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象(context)”。这个上下文对象不仅有窗口信息,还可以访问当前的时间和状态信息。这里的时间包括了处理时间(process time)和事件时间水位线(event time watermark)。这使得ProcessWindowFunction更加灵活、功能更加丰富,可以认为是一个增强版的WindowFunction。

// Full WindowFunction
    env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)
      // 为所有数据都指定同一个key,可以将所有数据发送到同一个分区
      .keyBy(_ => "key")
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .process(new UvCountByWindow)
      .print()
    
    env.execute()
  
  // 自定义窗口处理函数
  class UvCountByWindow extends ProcessWindowFunction[Event, String, String, TimeWindow]{
    // 
    override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
      // 初始化一个Set数据结构,用来对用户名进行去重
      var userSet = Set[String]()
      // 将所有用户进行去重
      elements.foreach(userSet += _.user)
      // 结合窗口信息,包装输出内容
      val windowStart = context.window.getStart
      val windowEnd = context.window.getEnd
      out.collect(" 窗口:"+ new Timestamp(windowStart) + " ~ "+ new Timestamp(windowEnd) + " 独立访客数为:" + userSet.size)
    }
增量和聚合函数结合使用

增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作,窗口计算更加灵活,功能更加强大。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数
据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。

    // 全窗口函数和聚合函数结合使用
    env.addSource(new ClickSource())
      .assignAscendingTimestamps(_.timeStamp)
      // 使用url作为key对数据进行分区
      .keyBy(_.url)
      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
      // 注意这里调用的是aggregate方法
      // 增量聚合函数和全窗口聚合函数结合使用
      .aggregate(new UrlViewCountAgg, new UrlViewCountResult)
      .print()

  class UrlViewCountAgg extends AggregateFunction[Event, Long, Long] {
    override def createAccumulator(): Long = 0L

    // 每来一个事件就加1
    override def add(in: Event, acc: Long): Long = acc + 1L

    // 窗口闭合时发送的计算结果
    override def getResult(acc: Long): Long = acc

    override def merge(acc: Long, acc1: Long): Long = ???
  }

  case class UrlViewCount(url: String, count: Long, windowStart: Long, windowEnd: Long)

  class UrlViewCountResult extends ProcessWindowFunction[Long, UrlViewCount, String, TimeWindow] {
    // 迭代器中只有一个元素,是增量聚合函数在窗口闭合时发送过来的计算结果
    override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
      out.collect(UrlViewCount(key, elements.iterator.next(), context.window.getStart, context.window.getEnd))
    }
  }

其它API

窗口的生命周期

熟悉了窗口 API 的使用,这里梳理一下窗口本身的生命周期,这也是对窗口所有操作的一个总结:

迟到数据的处理

所谓的“迟到数据”(late data),是指某个水位线之后到来的数据,它的时间戳其实是在水位线之前的。所以只有在事件时间语义下,讨论迟到数据的处理才是有意义的。

package com.whu.chapter06

import com.whu.chapter05.{ClickSource, Event}
import com.whu.chapter06.WindowFunctionDemo.{UrlViewCountAgg, UrlViewCountResult}

import java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import java.sql.Timestamp

object ProcessLateDataDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    // 为方便测试,读取socket文本流进行处理
    val stream = env.socketTextStream("localhost", 7777)
      .map(data => {
        val fields = data.split(",")
        Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
      })
    
    // 方式1:设置Watermark延迟时间 2秒钟
    val res1 = stream.assignTimestampsAndWatermarks(WatermarkStrategy
      // 最大延迟时间设置为5秒钟
      .forBoundedOutOfOrderness[Event](Duration.ofSeconds(2))
      .withTimestampAssigner( new SerializableTimestampAssigner[Event] {
        override def extractTimestamp(t: Event, l: Long): Long = t.timeStamp
      })
    )
    
    // 定义侧输出流标签
    val outputTag = OutputTag[Event]("late")
    val res2 = stream
      .keyBy(_.url)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      // 方式二:允许窗口处理迟到数据,设置1分钟的等待时间
      .allowedLateness(Time.minutes(1))
      // 方式三:将最后的迟到数据输出到侧输出流
      .sideOutputLateData(outputTag)
      .aggregate(new UrlViewCountAgg, new UrlViewCountResult)
    
    res2.print()
    
    res2.getSideOutput(outputTag).print("late")
    
    // 为方便观察,可以将原始数据也输出
    stream.print("input")
  }
}
上一篇下一篇

猜你喜欢

热点阅读