flink

Flink Window

2020-08-25  本文已影响0人  Rex_2013

1.window 概述


2.window 窗口的类型

2.1 根据数据流是否keyBy划分 Keyed vs Non-Keyed Windows

Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

-----------------------------------------------------------------------------------------------------------------------

Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

2.2 根据不同的Window Assigners划分

tumbling windows, sliding windows, session windows and global windows

2.2.1 滚动窗口(Tumbling Windows)
val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

通过静态方法TumblingEventTimeWindows.of来实例化TumblingEventTimeWindows类,可以通过源码看到TumblingEventTimeWindows构造是有两个参数

 private TumblingProcessingTimeWindows(long size, long offset) {
        ...
}

时间间隔可以通过指定Time.milliseconds(x),Time.seconds(x), Time.minutes(x)
至于offset 参数,该参数可用于更改窗口的对齐方式。例如,如果没有偏移,则每小时滚动窗口与epoch对齐,即你将获得诸如的窗口 1:00:00.000 - 1:59:59.999,2:00:00.000 - 2:59:59.999等等。如果要更改,可以提供一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.999,2:15:00.000 - 3:14:59.999等
一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,必须指定的偏移量Time.hours(-8)。

import java.sql.DriverManager

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object TublingTimeWindowKeyedStream {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val initStream: DataStream[String] = env.socketTextStream("node01", 8888)
    val wordStream = initStream.flatMap(_.split(" "))
    val pairStream = wordStream.map((_, 1))
    //是一个已经分好流的无界流
    val keyByStream = pairStream.keyBy(_._1)
    keyByStream
      .timeWindow(Time.seconds(5))
      .reduce(new ReduceFunction[(String, Int)] {
        override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
          (value1._1, value2._2 + value1._2)
        }
      },new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow] {
        override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
        }
      }).print()
    env.execute()
  }
}

2.2.2 滑动窗口(Sliding Windows)
val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

通过静态方法SlidingProcessingTimeWindows.of来实例化SlidingProcessingTimeWindows类,可以通过源码看到SlidingProcessingTimeWindows构造是有三个参数

private SlidingProcessingTimeWindows(long size, long slide, long offset) {
        ...
}

offset 参数作用跟TumblingEventTimeWindows的offset参数一样。

import java.util.Properties

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringSerializer

object SlidingTimeWindowKeyedStream {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置连接kafka的配置信息
    val props = new Properties()
    props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
    props.setProperty("group.id","flink-kafka-001")
    props.setProperty("key.deserializer",classOf[StringSerializer].getName)
    props.setProperty("value.deserializer",classOf[StringSerializer].getName)

    val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka",new SimpleStringSchema(),props))

    stream.map(data =>{
      val splits = data.split("\t")
      (splits(1),splits(3).toLong)
    }).keyBy(_._1)
      .timeWindow(Time.minutes(30),Time.seconds(10))
      .aggregate(new AggregateFunction[(String,Long),(String,Long,Long),(String,Double)] {

        override def createAccumulator(): (String, Long, Long) = ("",0,0)

        override def add(value: (String, Long), accumulator: (String, Long, Long)): (String, Long, Long) = {
          (value._1,accumulator._2+value._2,accumulator._3 + 1)
        }

        override def getResult(accumulator: (String, Long, Long)): (String, Double) = {
          (accumulator._1,accumulator._2.toDouble/accumulator._3)
        }

        override def merge(a: (String, Long, Long), b: (String, Long, Long)): (String, Long, Long) = {
          (a._1,a._2+b._2,a._3+b._3)
        }
      }).print()
    env.execute()
  }
}
2.2.3 会话窗口(Session Windows)
val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)


// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object SessionWindowTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("node01",8888)
    //session生命周期 10s   也就是说窗口如果连着10s中没有新的数据进入窗口,窗口就会滑动(触发计算)
//    EventTime   事件时间    Process Time:元素被处理的系统时间
//    stream.windowAll(EventTimeSessionWindows.withGap()).print()
    env.execute()
  }
}
2.2.4 全局窗口(Global Windows)
val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)

stream
       .keyBy(<key selector>)              
       .window/windowAll(<window assigner>)
       .<windowed transformation>(<window function>)    

2.3 根据不同的度量

3. Window Functions 窗口函数

元素从source获取之后或者被处理过之后,通过window assigner 定义了窗口类型并且给予分配到窗口后,window functions 定义了要对窗口中收集的数据做的计算操作

flink window 流程图

Window Function 可以分为两类

3.1 增量聚合函数(incremental aggregation functions)

聚合原理:窗口内保存一个中间聚合结果,随着新元素的加入,在每个窗口到达时以增量方式聚合它们
典型函数:ReduceFunction, AggregateFunction
优点:这类函数通常非常节省空间效率相对更优。


3.1.1 ReduceFunction

ReduceFunction指定如何将输入中的两个元素组合在一起以产生相同类型的输出元素。Flink使用ReduceFunction来逐步聚合窗口的元素。

下面代码是reduce 转换函数的flink源码:
输入:一个ReduceFunction 也可以传入一个reduce 逻辑的匿名函数
输出:一个DataStream

 /**
   * @param function The reduce function.
   * @return The data stream that is the result of applying the reduce function to the window.
   */
  def reduce(function: ReduceFunction[T]): DataStream[T] = {
    asScalaStream(javaStream.reduce(clean(function)))
  }

下面是flink 官网提供的代码,示例汇总了窗口中所有元素的元组的第二个字段。

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
3.1.2 AggregateFunction

AggregateFunction是一个一般化版本ReduceFunction,其具有三种泛型:输入类型(IN),累加器类型(ACC),和一个输出类型(OUT)。输入类型是输入流中元素的类型,并且AggregateFunction具有将一个输入元素添加到累加器的方法(add)。该接口还具有创建初始累加器(createAccumulator),将两个累加器合并为一个累加器以及OUT从累加器提取输出(类型)的方法(getResult)。在下面的示例中,我们将了解其工作原理。
与ReduceFunction一样,Flink将在输入元素到达窗口时增量地聚合它们。
AggregateFunction可以被定义并这样使用:

/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)
3.1.3 增量函数案例

案例1:每隔10s统计每辆汽车的平均速度

import java.util.Properties

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringSerializer

object Demo03SpeedAVG {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val props = new Properties()
    props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
    props.setProperty("group.id", "flink-kafka-001")
    props.setProperty("key.deserializer", classOf[StringSerializer].getName)
    props.setProperty("value.deserializer", classOf[StringSerializer].getName)

    val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), props))

    stream.map(data => {
      val splits = data.split("\t")
      (splits(1),splits(3).toInt)
    }).keyBy(_._1)
      .timeWindow(Time.seconds(10))
      .aggregate(new AggregateFunction[(String,Int),(String,Int,Int),(String,Double)] {
        override def createAccumulator(): (String, Int, Int) = ("",0,0)

        override def add(value: (String, Int), accumulator: (String, Int, Int)): (String, Int, Int) = {
          (value._1,value._2+accumulator._2,accumulator._3+1)
        }

        override def getResult(accumulator: (String, Int, Int)): (String, Double) = {
          (accumulator._1,accumulator._2.toDouble/accumulator._3)
        }

        override def merge(a: (String, Int, Int), b: (String, Int, Int)): (String, Int, Int) = {
          (a._1,a._2+b._2,a._3+b._3)
        }
      }).print()

    env.execute()
  }
}
3.1.3 FoldFunction

FoldFunction指定了一个输入元素如何与一个指定输出类型的元素合并的过程,这个FoldFunction 会被每一个加入到窗口中的元素和当前的输出值增量地调用,第一个元素是与一个预定义的类型为输出类型的初始值合并。

下面是flink 官网提供的代码,最初化一个为空的String,将所有输入Long(v._2)值附加到这个String上。

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("") { (acc, v) => acc + v._2 }

3.2 全量聚合函数(full window functions)

聚合原理:收集窗口内的所有元素,获取Iterable窗口中包含的所有元素的,以及有关元素所属窗口的其他元信息,并且在执行的时候对他们进行遍历。
特点:可以获取Flink执行的上下文,可以拿到当前的数据更多信息,比如窗口状态、窗口起始与终止时间、当前水印、时间戳等


3.2.1 ProcessWindowFunction

ProcessWindowFunction获得一个Iterable,它包含窗口的所有元素,以及一个可以访问时间和状态信息的Context对象,这使它比其他窗口函数具有更大的灵活性。这是以性能和资源消耗为代价的,因为不能增量聚合元素,而是需要在内部对其进行缓冲,直到将窗口关闭时才会进行处理。

ProcessWindowFunction 的源码如下:

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {

  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key      The key for which this window is evaluated.
    * @param context  The context in which the window is being evaluated.
    * @param elements The elements in the window being evaluated.
    * @param out      A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def process(
      key: KEY,
      context: Context,
      elements: Iterable[IN],
      out: Collector[OUT])

  /**
    * The context holding window metadata
    */
  abstract class Context {
    /**
      * Returns the window that is being evaluated.
      */
    def window: W

    /**
      * Returns the current processing time.
      */
    def currentProcessingTime: Long

    /**
      * Returns the current event-time watermark.
      */
    def currentWatermark: Long

    /**
      * State accessor for per-key and per-window state.
      */
    def windowState: KeyedStateStore

    /**
      * State accessor for per-key global state.
      */
    def globalState: KeyedStateStore
  }

}

ProcessWindowFunction可以定义像这样使用:

val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

该示例显示了一个ProcessWindowFunction计算窗口中元素的方法,函数将有关窗口的信息输出。


3.3 ProcessWindowFunction with Incremental Aggregation

ProcessWindowFunction可以与ReduceFunction,AggregateFunction或FoldFunction组合,当元素到到达窗口时,增量聚合元素。当窗口关闭时,ProcessWindowFunction将提供汇总结果。这样一来,它便可以增量地计算窗口,又可以访问到窗口元信息。

旧版的可以使用WindowFunction来代替 ProcessWindowFunction


3.3.1 ReduceFunction 组合使用

以下示例显示了如何将ReduceFunction与ProcessWindowFunction组合使用以增量返回窗口中最小的事件以及窗口的开始时间。

val input: DataStream[SensorReading] = ...

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .reduce(
    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
    ( key: String,
      context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
      minReadings: Iterable[SensorReading],
      out: Collector[(Long, SensorReading)] ) =>
      {
        val min = minReadings.iterator.next()
        out.collect((context.window.getStart, min))
      }
    )
3.3.2 AggregateFunction组合使用

以下示例显示了如何将AggregateFunction与ProcessWindowFunction组合使用以增量计算平均值,并且和key一起返回。

val input: DataStream[(String, Long)] = ...

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction())

// Function definitions

/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {

  def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]) = {
    val average = averages.iterator.next()
    out.collect((key, average))
  }
}
3.3.3 FoldFunction组合使用

以下示例显示了如何将FoldFunction与ProcessWindowFunction组合视同以增量提取窗口中的事件数,并还返回窗口的key和结束时间。

val input: DataStream[SensorReading] = ...

input
 .keyBy(<key selector>)
 .timeWindow(<duration>)
 .fold (
    ("", 0L, 0),
    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
    ( key: String,
      window: TimeWindow,
      counts: Iterable[(String, Long, Int)],
      out: Collector[(String, Long, Int)] ) =>
      {
        val count = counts.iterator.next()
        out.collect((key, window.getEnd, count._3))
      }
  )

3.4 其它可选API

参考 flink官网 windows

上一篇 下一篇

猜你喜欢

热点阅读