flinkApache Flink

Flink之状态管理

2019-05-09  本文已影响0人  MrSocean

最近看了看Flink中state方面的知识,Flink中的state是啥?state的作用是啥?为什么Flink中引入了state这个概念?既然最近的项目需要用到state,结合官网再加上网上大佬state的博客,那就简单了解一下Flink中的state吧!

1.State是什么

看到state这个词,给人的第一印象就是中文翻译:状态,那么state在Flink中有什么具体的涵义呢?state是Flink程序某个时刻某个task/operator的状态,state数据是程序运行中某一时刻数据结果,这个state数据会保存在taskmanager的内存中,也就是java的堆内存中。官方推荐使用先存储到本地的RocksDB中,然后再将RocksDB中的状态数据异步checkpoint到hdfs上。

2.State的作用

首先要将state和checkpoint概念区分开,可以理解为checkpoint是要把state数据持久化存储起来,checkpoint默认情况下会存储在JoManager的内存中。checkpoint表示一个Flink job在一个特定时刻的一份全局状态快照,方便在任务失败的情况下数据的恢复。

3.State 状态值存储

State 的store和checkpoint的位置最终取决于State Backend的配置env.setStateBackend()。一共有三种env.setStateBackend(new RocksDBStateBackend()), env.setStateBackend(new MemoryStateBackend()), env.setStateBackend(new FsStateBackend())。

→→→→→→→→→→→→→→MemoryStateBackend ←←←←←←←←←←←←
☞原理

①MemoryStateBackend 在java堆内存上维护状态。
②Checkpoint时,MemoryStateBackend 对State做一次快照,并在向JobManager发送checkpoint确认完成的消息中带上次快照数据,然后快照就会存储在JobManager的堆内存中。
③MemoryStateBackend 可以使用异步的方式进行快照,推荐使用异步的方式避免阻塞。如果不希望异步,可以在构造的时候传入false

val backend: MemoryStateBackend = new MemoryStateBackend(10 *1024 *1024,false)
env.setStateBackend(backend)
☞ 限制

单个state的大小默认值为5MB,可以在MemoryStateBackend的构造函数中增加。无论如何配置,State大小都无法大于akka.framesize(JobManager和TaskManager之间发送的最大消息的大小,默认是10MB)。JobManager必须有足够的内存。

state默认大小
☞ 适用场景

本地开发和调试

→→→→→→→→→→→→→→→→FsStateBackend←←←←←←←←←←←←←←
☞ 原理

①FsStateBackend需要配置一个文件系统的url,如hdfs上的路径:hdfs://hfflink/flink/checkpoints/... 。
②FsStateBackend在TaskManager的内存中持有正在处理的数据。Checkpoint时将state snapshot写入文件系统目录下的文件中。文件的路径等元数据会传递给JobManager。
③FsStateBackend可以使用异步的方式进行快照,推荐使用异步的方式避免阻塞。如果不希望异步,可以在构造的时候传入false

val backend = new FsStateBackend("hdfs://hfflink/flink/checkpoints/...",false)
env.setStateBackend(backend)
☞ 适用场景

大状态,长窗口,大键/值状态的job

→→→→→→→→→→→→→→RocksDBStateBackend←←←←←←←←←←←←
☞ 原理

①RocksDBStateBackend需要配置一个文件系统的url,如hdfs上的路径:hdfs://hfflink/flink/checkpoints/... 。
②RocksDBStateBackend将运行中的数据保存在RocksDB数据库中,默认情况下存储在TaskManager数据目录中。在checkpoint的时候,整个RocksDB数据库将被checkpointed到配文件的文件系统和目录中。文件的路径等元数据会 传递给JobManager,存在其内存中。
③RocksDBStateBackend总是执行异步快照。

☞ 限制

RocksDB JNI API是基于byte[],因此key和value最大支持大小为2^31个字节(2GB),RocksDB自身在支持较大value时候会有限制。

☞ 使用场景

超大状态,超长窗口,大键/值状态的job.

☞ 与前两种状态后端对比

①状态保存在数据库RocksDB中,相比其他状态后端可保存更大的状态,但开销更大(读/写需要反序列化/序列化去检索/存储状态)。
②目前只有RocksDBStateBackend支持增量checkpoint(默认全量),false默认全量,true代表增量。

val checkpoint = "hdfs://hfflink/flink/checkpoints/..."
env.setStateBackend(new RocksDBStateBackend(checkpoint, true))

4.State的应用

KeyedState是基于KeyedStream上的状态,这个状态是跟特定的key绑定的,对KeyedStream流上的每个key都有对应的state。Keyed State 仅仅可以被使用在基于KeyStream上的functions和operators中。

OperatorState就是non-keyed state,每个Operator State都绑定到一个并行的算子实例中。Kafka Connector就是一个使用Operator State的一个很好的例子,他会在每个connector实例中,保存改实例中消费topic的所有(partition,offset)映射。

5.State存在形式

Keyed State和Operator State存在两种形式:managed (托管状态)和 raw(原始状态)。托管状态是由Flink框架管理的状态;而原始状态是由用户自行管理状态的具体数据结构,框架在做checkpoint的时候,使用bytes 数组读写状态内容,对其内部数据结构一无所知。通常所有的datastream functions都可以使用托管状态,但是原始状态接口仅仅能够在实现operators的时候使用。推荐使用managed state而不是使用raw state,因为使用托管状态的时候Flink可以在parallelism发生改变的情况下能够动态重新分配状态,而且还能更好的进行内存管理。

6.Keyed State在托管状态下使用

托管的keyed state接口提供了不同类型的state,它们都是作用于当前输入流元素的key上。这就意味着这种类型的state仅仅可以被使用在keyedStream之上,keyedStream可以通过stream.keyBy(...)来创建。
接下来我们首先来看一看下面所提供的不同类型的state,然后再看看它们在代码中是如果使用的。

case class valueStateFunction extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
private var sum: ValueState[(Long, Long)] = _

override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }
    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("value-state-name", createTypeInformation[(Long, Long)])
    )
  }
}

val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new valueStateFunction())
.print()

env.execute("job-name")
case class CountListState() extends RichFlatMapFunction[(Long, Long), List[(Long, Long)]] {
    private var listState: ListState[(Long, Long)] = null
    //业务逻辑根据自己的实际情况设计
    override def flatMap(value: (Long, Long), out: Collector[List[(Long, Long)]]): Unit = {
      var list: List[(Long, Long)] = List()

      listState.add(value)
      val iterable: lang.Iterable[(Long, Long)] = listState.get()
      val itt = iterable.iterator()

      while (itt.hasNext) {
        list = list :+ itt.next()
      }
      out.collect(list)
    }

    override def open(parameters: Configuration): Unit = {
      listState = getRuntimeContext.getListState(new ListStateDescriptor[(Long, Long)]("list-state-name", createTypeInformation[(Long, Long)]))
    }
  }

val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new listStateFunction())
.print()

env.execute("job-name")
case class reducingStateFunction() extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
    private var state: ReducingState[Long] = null
    //业务逻辑根据自己的实际情况设计
    override def flatMap(value: (Long, Long), out: Collector[(Long, Long)]): Unit = {   
      state.add(value._2)

      out.collect(value)
    }

    override def open(parameters: Configuration): Unit = {
      state = getRuntimeContext.getReducingState(new ReducingStateDescriptor[Long]("reducing-state-name", new ReduceFunction[Long] {
        override def reduce(value1: Long, value2: Long): Long = {
          value1 + value2
        }
      }, createTypeInformation[Long]))
    }
  }

val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new reducingStateFunction())
.print()

env.execute("job-name")
case class aggregatingStateFunction() extends RichMapFunction[(Long, Long), Long] {
    private var state: AggregatingState[(Long, Long), Long] = null
     //业务逻辑根据自己的实际情况设计
    override def map(value: (Long, Long)): Long = {
      state.add(value)
      value._2
    }

    override def open(parameters: Configuration): Unit = {
      state = getRuntimeContext.getAggregatingState(new AggregatingStateDescriptor[(Long, Long), (Long, Long), Long]("aggr-state-name", aggregateFunction, createTypeInformation[(Long, Long)]))
    }
  }

val aggregateFunction = new AggregateFunction[(Long, Long), (Long, Long), Long] {
    override def createAccumulator(): (Long, Long) = {
      (0L, 0L)
    }

    override def add(value: (Long, Long), accumulator: (Long, Long)): (Long, Long) = {
      (accumulator._1 + 1, accumulator._2 + value._2)
    }

    override def getResult(accumulator: (Long, Long)): Long = {
      val result = accumulator._1 / accumulator._2
      result
    }

    override def merge(a: (Long, Long), b: (Long, Long)): (Long, Long) = {
      (a._1 + b._1, a._2 + b._2)
    }
  }

val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new aggregatingStateFunction())
.print()

env.execute("job-name")
case class foldingStateFunction() extends RichMapFunction[(Long, Long), (Long, Long)] {
    private var state: FoldingState[(Long, Long), Long] = null
    //业务逻辑根据自己的实际情况设计
    override def map(value: (Long, Long)): (Long, Long) = {
      state.add(value)
      value
    }

    override def open(parameters: Configuration): Unit = {
      state = getRuntimeContext.getFoldingState(new FoldingStateDescriptor[(Long, Long), Long]("folding-state-name", 10L, new FoldFunction[(Long, Long), Long] {
        override def fold(accumulator: Long, value: (Long, Long)): Long = {
        
          accumulator + value._1 + value._2
        }
      }, createTypeInformation[Long]))
    }
  }

val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new foldingStateFunction())
.print()

env.execute("job-name")
case class mapStateFunction() extends RichMapFunction[(Long, Long), ((Long, Long), Long)] {

    private var state: MapState[(Long, Long), Long] = null
    //业务逻辑根据自己的实际情况设计
    override def map(value: (Long, Long)): ((Long, Long), Long) = {

      state.put(value, value._1 + value._2)

      (value, state.get(value))
    }

    override def open(parameters: Configuration): Unit = {
      val keyMapState = createTypeInformation[(Long, Long)]
      val valueMapState = createTypeInformation[Long]
      state = getRuntimeContext.getMapState(new MapStateDescriptor[(Long, Long), Long]("map-state-name", keyMapState, valueMapState))
    }
  }

val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new mapStateFunction())
.print()

env.execute("job-name")

6-1.State生存周期

time-to-live(TTL)能够被使用在任何类型的keyed state。如果TTL被配置而且状态值已经过期,则将以最佳的方式清理。为了使用状态TTL,必须首先构建一个StateTtlConfig 对象。通过配置可以在任何state descriptor 中使用。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
    
val stateDescriptor = new ValueStateDescriptor[String]("value_state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

配置中有下面几个配置项可以选择:StateTtlConfig中的newBuilder这个方法是必须的,它是设置生存周期的值。

策略类型 描述
StateTtlConfig.UpdateType.Disabled 禁用TTL,永不过期
StateTtlConfig.UpdateType.OnCreateAndWrite 每次写操作都会更新State的最后访问时间
StateTtlConfig.UpdateType.OnReadAndWrite 每次读写操作都会跟新State的最后访问时间
策略类型 描述
StateTtlConfig.StateVisibility.NeverReturnExpired 永不返回过期状态
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp 可以返回过期但尚未被清理的状态值

Notes:

6-1-1.State清除策略

Cleanup in full snapshot↓

默认情况下,过期值只有在显式读出时才会被删除,例如通过调用ValueState.value()方法。此外,您可以在获取完整状态快照时激活清理操作,这将减少其大小。在当前实现下,本地状态不会被清除,但在从前一个快照恢复时,它不会包含已删除的过期状态。可以在StateTtlConfig中配置.
1:下面的配置选项不适用于RocksDB state backend上的increamental checkpointing:;
2:对于现有作业,此清理策略可以在StateTtlConfig中随时激活或停用,例如从保存点重新启动后可以使用。

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build
Incremental cleanup↓

另一个选项是增量地触发对某些状态项的清理。触发器可以是来自每个状态访问或/和每个记录处理的回调。如果这个清理策略在某个状态下活跃的,那么存储后端会在其所有条目上为该状态保留一个惰性全局迭代器。每次触发增量清理时,迭代器都会被提升。检查遍历的状态项,并清理过期的状态项。
这个特性可以在StateTtlConfig中激活:

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlCon fig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally
    .build

上面的策略有两个参数,第一个参数:第是每次清理触发的检查状态的条件。如果启用,则每次状态访问都将触发它。第二个参数:是否为每个记录处理额外触发清理。
Notes:

Cleanup during RocksDB compaction↓

如果使用RocksDB进行状态的管理,另一个清理策略就是激活Flink的压缩过滤这个策略。RocksDB会定期使用异步压缩来合并状态的更新和减少储存。Flink压缩过滤器使用TTL检查状态的过期时间戳,并排除过期值。
默认情况下是关闭该特性的。对于RocksDB进行状态管理首先要做的就是要激活,通过Flink配置文件配置state.backend.rocksdb.ttl.compaction.filter.enabled,或者对于一个Flink job来说如果一个自定义的RocksDB 状态管理被创建那么它可以调用 RocksDBStateBackend::enableTtlCompactionFilter。然后任何带有TTL的状态都可以配置来去使用过滤器。

import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter
    .build

RocksDB compaction filter将会从Flink每次处理完一定数据量的状态之后,从Flink查询用于检查过期的当前时间戳,这个数字默认是1000。你也可以选择更改它,并将自定义值传递给StateTtlConfig.newBuilder(…)。cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)方法。频繁的跟新时间错可以提高清理的数据但是会降低压缩性能,因为它使用了来自本地的JNI的调用。

Notes:

目前,管理operator state仅仅支持使用List类型。当前,支持List样式的托管运算符状态,彼此之间相互独立,因此可以在重新缩放时可以重新分配。换句话说,这些对象是可以重新分配non-keyed state的最佳粒度。根据状态访问方法,定义一下重新分配方案。

6-2.State在scala DataStream API中的使用

除了上面描述的接口之外,Scala API还为有状态map()或flatMap()函数提供了快捷方式,这些函数在KeyedStream上只有一个ValueState。方法中在一个Option中获取ValueState的当前值,并且必须返回一个更新后的值,该值将用于更新状态。

val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
   })

7. Operator State在托管状态下使用

为了使用的operator state,有状态的函数可以通过CheckpointedFunction 接口或者ListCheckpointed<T extends Serializable>接口实现。

CheckpointedFunction↓

CheckpointedFunction 接口通过不同的重新分发方案提供对非键状态(no-keyed state)的访问。它需要实现下面两种方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

每当必须执行检查点时,都会调用SnapshotState()。每次初始化用户定义的函数时,都会调用对应的initializeState()方法,急当函数首次初始化时,或者当函数实际从早期的检查点恢复时。因此,initializeState() 方法不仅仅时初始化不同类型状态的地方,而且也是状态恢复逻辑的地方。

当前,托管的 operator state支持List 类型。状态应该是可序列化对象的List,彼此之间独立,因此在重新调节之后可以重新分配。换句话说,这些对象是non-keyed state重新分配的最佳粒度。根据状态访问方法,定义了一下重新分配的方案:

下面是一个有状态的SinkFunction的例子,它使用CheckpointedFunction 在将元素发送到下游之前进将它们缓冲起来。它演示了even-split redistribution这个策略:

class BufferingSink(threshold: Int = 0)
  extends SinkFunction[(String, Int)]
    with CheckpointedFunction {

  @transient
  private var checkpointedState: ListState[(String, Int)] = _

  private val bufferedElements = ListBuffer[(String, Int)]()

  override def invoke(value: (String, Int), context: Context): Unit = {
    bufferedElements += value
    if (bufferedElements.size == threshold) {
      for (element <- bufferedElements) {
        // send it to the sink
      }
      bufferedElements.clear()
    }
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor = new ListStateDescriptor[(String, Int)](
      "buffered-elements",
      TypeInformation.of(new TypeHint[(String, Int)]() {})
    )

    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    if(context.isRestored) {
      for(element <- checkpointedState.get()) {
        bufferedElements += element
      }
    }
  }

}

initializeState ()方法中有一个FunctionInitializationContext参数。这个参数是用来初始化non-keyed state的“containers”。这有一个ListState类型的container,non-keyed state 的对象将会在进行检查点时保存到这。

注意怎么初始化这个状态,和keyed state类似,使用一个StateDescriptor 包含了状态的名字和所持有的状态值类型的信息:

val descriptor = new ListStateDescriptor[(String, Long)](
    "buffered-elements",
    TypeInformation.of(new TypeHint[(String, Long)]() {})
)

checkpointedState = context.getOperatorStateStore.getListState(descriptor)

状态访问方法的命名约定包含其重新分配模式和状态结构。例如,当在还原的时候将list state和Union redistribution方案一起使用,通过使用getUnionListState(descriptor)获取state。如果方法名不包含重分配的模式,例如,getListState(descriptor),这将意味着使用基本的Even-split redistribution 方案。

初始化容器之后,我们使用FunctionInitializationContext的isRestored()方法检查当我们的任务失败后是否正在恢复。如果isRestored()方法返回的结果是true,说明正在恢复,则使用恢复逻辑。

如修改后的BufferingSink代码所示,状态初始化期间恢复的ListState保存在类变量中,以备将来在Snapshotstate()中使用。ListState被清除了前一个checkpoint包含的所有对象,然后被我们想要填充的新对象进行填充。

另一方面,keyed state可以在initializeState()方法中初始化。使用FunctionInitializationContext就可以完成。

override def initializeState(context: FunctionInitializationContext): Unit = {

    try {
      val descriptor: ListStateDescriptor[List[RealTimeContent]] = new ListStateDescriptor[List[RealTimeContent]]("content-sink", createTypeInformation[List[RealTimeContent]])

      checkpointedState = context.getOperatorStateStore.getListState(descriptor)

      if (context.isRestored) {

        val itt = checkpointedState.get().iterator()
        while (itt.hasNext) {

          val element = itt.next()
          bufferElements += element
        }

      }
    } catch {
      case e: Exception => println(e.getMessage)
    }
  }
ListCheckpointed↓

ListCheckpointed() 接口是 CheckpointedFunction()有限的一个变体。它只支持List 类型的state 和 even-split redistribution 策略结合的恢复。它要求实现下面两个方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

在snapshotState()方法中,operator应该返回一个对象的列表给到checkpoint,restoreState ()方法则是在恢复中使用这个列表的数据。如果这个state是不可再分区的,你可以在snapshotState()一直返回一个 Collections.singletonList(MY_STATE)。

7-1.有State的Source Functions

与其它的operators相比,有状态的Sources需要更多的关注。为了对状态和输出集合原子性进行更新(要求故障恢复时的 exactly-once来说),则要求开发者从source的context中获取一个lock。

class CounterSource
       extends RichParallelSourceFunction[Long]
       with ListCheckpointed[Long] {

  @volatile
  private var isRunning = true

  private var offset = 0L

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    val lock = ctx.getCheckpointLock

    while (isRunning) {
      // output and state update are atomic
      lock.synchronized({
        ctx.collect(offset)

        offset += 1
      })
    }
  }

  override def cancel(): Unit = isRunning = false

  override def restoreState(state: util.List[Long]): Unit =
    for (s <- state) {
      offset = s
    }

  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
    Collections.singletonList(offset)

}
上一篇下一篇

猜你喜欢

热点阅读