Flink专题

Flink Streaming:Process函数(低级别操作)

2019-02-15  本文已影响90人  尼小摩

ProcessFunction是一个低级的流处理操作,可以访问所有(非循环)流应用程序的基本构建块:

ProcessFunction可以看作是一个具有键控状态(keyed state)和计时器(timers)访问权的FlatMapFunction。它通过调用输入流中接收的每个事件来处理事件。对于容错状态,ProcessFunction允许访问Flink的键状态,可以通过RuntimeContext访问,类似于其他有状态函数访问键状态的方式。

计时器(timers)允许应用程序对处理时间(processing time)和事件时间(event time)中的变更作出响应。processElement(…)函数的每次调用都获得一个上下文(Context )对象,该对象可以访问元素的事件时间戳TimerServiceTimerService可用于为将来的事件时间/处理时间注册回调。当到达计时器的特定时间时,将调用onTimer(…)方法。在调用期间,所有状态的作用域再次限定为创建计时器的键,允许计时器操作键控状态。

注意:如果你想访问键控状态和定时器,你必须在一个键控流上应用ProcessFunction:

stream.keyBy(...).process(new MyProcessFunction())

要实现对两个输入的低级操作,应用程序可以使用CoProcessFunction(协处理器功能)。这个函数绑定了两个不同的输入,并获取对processElement1(…)processElement2(…)的单独调用,以获取来自两个不同输入的记录。

实现低级连接通常遵循以下模式:

例如,将客户数据与金融交易join起来,同时保留客户数据的状态。如果在无序事件中,当客户数据流的水印超过交易时间时,可以使用计时器计算并发出交易的连接。

例子

下面的示例维护每个键的计数,并在没有更新该键的情况下,每分钟过去(在事件时间中)时发出键/计数对:

这个简单的示例可以用会话窗口实现。在这里使用ProcessFunction来说明它提供的基本模式。

import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.ProcessFunction.Context
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
import org.apache.flink.util.Collector

// the source data stream
val stream: DataStream[Tuple2[String, String]] = ...

// apply the process function onto a keyed stream
val result: DataStream[Tuple2[String, Long]] = stream
  .keyBy(0)
  .process(new CountWithTimeoutFunction())

/**
  * The data type stored in the state
  */
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

/**
  * The implementation of the ProcessFunction that maintains the count and timeouts
  */
class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {

  /** The state that is maintained by this process function */
  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
    .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))


  override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
    // initialize or retrieve/update the state

    val current: CountWithTimestamp = state.value match {
      case null =>
        CountWithTimestamp(value._1, 1, ctx.timestamp)
      case CountWithTimestamp(key, count, lastModified) =>
        CountWithTimestamp(key, count + 1, ctx.timestamp)
    }

    // write the state back
    state.update(current)

    // schedule the next timer 60 seconds from the current event time
    ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
  }

  override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
    state.value match {
      case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
        out.collect((key, count))
      case _ =>
    }
  }
}

在Flink 1.4.0之前,当从处理时计时器调用ProcessFunction.onTimer()方法时,它将当前处理时间设置为事件时间戳。这种行为非常微妙,用户可能不会注意到。它是有害的,因为处理时间戳是不确定的,并且与水印不一致。此外,用户实现的逻辑依赖于这个错误的时间戳,这很可能是无心的错误。所以我们决定修复它。当升级到1.4.0时,使用这个错误事件时间戳的Flink作业将失败,用户应该使其作业适应正确的逻辑。

KeyedProcessFunction

KeyedProcessFunction作为ProcessFunction的扩展,在onTimer(…)方法中提供对定时器键的访问。

override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {
  var key = ctx.getCurrentKey
  // ...
}

定时器(Timers)

这两种类型的计时器:处理时间和事件时间都由TimerService在内部维护并排队等待执行。

TimerService根据每个键和时间戳删除重复计时器,每个键和时间戳最多有一个计时器。如果为相同的时间戳注册了多个计时器,则只调用onTimer()方法一次。

注意:Flink同步调用onTimer()processElement()方法。因此,用户不必担心状态的并发修改。

故障容错(Fault Tolerance)

计时器与应用程序的状态都具有容错和检查点功能。在故障恢复或从保存点启动应用程序时,计时器将被恢复。

注意:在恢复之前应该立即触发的检查点处理时间计时器。这可能发生在应用程序从故障中恢复或从保存点启动时。

注意:计时器是异步检查点,除了RocksDB后端/增量快照/基于堆的计时器的组合之外(将使用FLINK-10026解析)。注意,大量计时器会增加时间检查点,因为计时器是检查点状态的一部分。有关如何减少计时器数量的建议,请参阅“计时器合并(Timer Coalescing)”一节。

计时器合并(Timer Coalescing)

由于Flink对每个键和时间戳只维护一个计时器,可以通过降低计时器的精度来合并计时器,从而减少计时器的数量。

对于1秒的定时器精度(事件或处理时间),你可以把目标时间四舍五入到整秒。计时器的启动时间最多提前1秒,但不会晚于要求的毫秒精度。因此,每个键和秒最多有一个计时器。

val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
ctx.timerService.registerProcessingTimeTimer(coalescedTime)

由于事件时间定时器只触发带有水印的情况,也可以使用当前的水印调度并结合这些定时器与下一个水印:

val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerEventTimeTimer(coalescedTime)

计时器也可以停止和删除如下:

停止处理时间(processing-time)计时器:
val timestampOfTimerToStop = ...
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
停止事件时间(event-time)计时器:
val timestampOfTimerToStop = ...
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)

如果没有注册具有给定时间戳的计时器,则停止计时器无效。

上一篇 下一篇

猜你喜欢

热点阅读