Flink之状态管理
最近看了看Flink中state方面的知识,Flink中的state是啥?state的作用是啥?为什么Flink中引入了state这个概念?既然最近的项目需要用到state,结合官网再加上网上大佬state的博客,那就简单了解一下Flink中的state吧!
1.State是什么
看到state这个词,给人的第一印象就是中文翻译:状态,那么state在Flink中有什么具体的涵义呢?state是Flink程序某个时刻某个task/operator的状态,state数据是程序运行中某一时刻数据结果,这个state数据会保存在taskmanager的内存中,也就是java的堆内存中。官方推荐使用先存储到本地的RocksDB中,然后再将RocksDB中的状态数据异步checkpoint到hdfs上。
2.State的作用
首先要将state和checkpoint概念区分开,可以理解为checkpoint是要把state数据持久化存储起来,checkpoint默认情况下会存储在JoManager的内存中。checkpoint表示一个Flink job在一个特定时刻的一份全局状态快照,方便在任务失败的情况下数据的恢复。
3.State 状态值存储
State 的store和checkpoint的位置最终取决于State Backend的配置env.setStateBackend()。一共有三种env.setStateBackend(new RocksDBStateBackend()), env.setStateBackend(new MemoryStateBackend()), env.setStateBackend(new FsStateBackend())。
→→→→→→→→→→→→→→MemoryStateBackend ←←←←←←←←←←←←
☞原理
①MemoryStateBackend 在java堆内存上维护状态。
②Checkpoint时,MemoryStateBackend 对State做一次快照,并在向JobManager发送checkpoint确认完成的消息中带上次快照数据,然后快照就会存储在JobManager的堆内存中。
③MemoryStateBackend 可以使用异步的方式进行快照,推荐使用异步的方式避免阻塞。如果不希望异步,可以在构造的时候传入false
val backend: MemoryStateBackend = new MemoryStateBackend(10 *1024 *1024,false)
env.setStateBackend(backend)
☞ 限制
单个state的大小默认值为5MB,可以在MemoryStateBackend的构造函数中增加。无论如何配置,State大小都无法大于akka.framesize(JobManager和TaskManager之间发送的最大消息的大小,默认是10MB)。JobManager必须有足够的内存。
state默认大小
☞ 适用场景
本地开发和调试
→→→→→→→→→→→→→→→→FsStateBackend←←←←←←←←←←←←←←
☞ 原理
①FsStateBackend需要配置一个文件系统的url,如hdfs上的路径:hdfs://hfflink/flink/checkpoints/... 。
②FsStateBackend在TaskManager的内存中持有正在处理的数据。Checkpoint时将state snapshot写入文件系统目录下的文件中。文件的路径等元数据会传递给JobManager。
③FsStateBackend可以使用异步的方式进行快照,推荐使用异步的方式避免阻塞。如果不希望异步,可以在构造的时候传入false
val backend = new FsStateBackend("hdfs://hfflink/flink/checkpoints/...",false)
env.setStateBackend(backend)
☞ 适用场景
大状态,长窗口,大键/值状态的job
→→→→→→→→→→→→→→RocksDBStateBackend←←←←←←←←←←←←
☞ 原理
①RocksDBStateBackend需要配置一个文件系统的url,如hdfs上的路径:hdfs://hfflink/flink/checkpoints/... 。
②RocksDBStateBackend将运行中的数据保存在RocksDB数据库中,默认情况下存储在TaskManager数据目录中。在checkpoint的时候,整个RocksDB数据库将被checkpointed到配文件的文件系统和目录中。文件的路径等元数据会 传递给JobManager,存在其内存中。
③RocksDBStateBackend总是执行异步快照。
☞ 限制
RocksDB JNI API是基于byte[],因此key和value最大支持大小为2^31个字节(2GB),RocksDB自身在支持较大value时候会有限制。
☞ 使用场景
超大状态,超长窗口,大键/值状态的job.
☞ 与前两种状态后端对比
①状态保存在数据库RocksDB中,相比其他状态后端可保存更大的状态,但开销更大(读/写需要反序列化/序列化去检索/存储状态)。
②目前只有RocksDBStateBackend支持增量checkpoint(默认全量),false默认全量,true代表增量。
val checkpoint = "hdfs://hfflink/flink/checkpoints/..."
env.setStateBackend(new RocksDBStateBackend(checkpoint, true))
4.State的应用
-
State->KeyedState
KeyedState是基于KeyedStream上的状态,这个状态是跟特定的key绑定的,对KeyedStream流上的每个key都有对应的state。Keyed State 仅仅可以被使用在基于KeyStream上的functions和operators中。
-
State->OperatorState
OperatorState就是non-keyed state,每个Operator State都绑定到一个并行的算子实例中。Kafka Connector就是一个使用Operator State的一个很好的例子,他会在每个connector实例中,保存改实例中消费topic的所有(partition,offset)映射。
5.State存在形式
Keyed State和Operator State存在两种形式:managed (托管状态)和 raw(原始状态)。托管状态是由Flink框架管理的状态;而原始状态是由用户自行管理状态的具体数据结构,框架在做checkpoint的时候,使用bytes 数组读写状态内容,对其内部数据结构一无所知。通常所有的datastream functions都可以使用托管状态,但是原始状态接口仅仅能够在实现operators的时候使用。推荐使用managed state而不是使用raw state,因为使用托管状态的时候Flink可以在parallelism发生改变的情况下能够动态重新分配状态,而且还能更好的进行内存管理。
6.Keyed State在托管状态下使用
托管的keyed state接口提供了不同类型的state,它们都是作用于当前输入流元素的key上。这就意味着这种类型的state仅仅可以被使用在keyedStream之上,keyedStream可以通过stream.keyBy(...)来创建。
接下来我们首先来看一看下面所提供的不同类型的state,然后再看看它们在代码中是如果使用的。
- ValueState<T>:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update()方法更新状态值,通过value()方法获取状态值。
case class valueStateFunction extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// access the state value
val tmpCurrentSum = sum.value
// If it hasn't been used before, it will be null
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// update the count
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
// update the state
sum.update(newSum)
// if the count reaches 2, emit the average and clear the state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("value-state-name", createTypeInformation[(Long, Long)])
)
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new valueStateFunction())
.print()
env.execute("job-name")
- ListState<T>:即key上的状态值为一个列表。可以通过add()方法往列表中附加值;也可以通过get()方法返回一个Iterable<T>来遍历状态值。
case class CountListState() extends RichFlatMapFunction[(Long, Long), List[(Long, Long)]] {
private var listState: ListState[(Long, Long)] = null
//业务逻辑根据自己的实际情况设计
override def flatMap(value: (Long, Long), out: Collector[List[(Long, Long)]]): Unit = {
var list: List[(Long, Long)] = List()
listState.add(value)
val iterable: lang.Iterable[(Long, Long)] = listState.get()
val itt = iterable.iterator()
while (itt.hasNext) {
list = list :+ itt.next()
}
out.collect(list)
}
override def open(parameters: Configuration): Unit = {
listState = getRuntimeContext.getListState(new ListStateDescriptor[(Long, Long)]("list-state-name", createTypeInformation[(Long, Long)]))
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new listStateFunction())
.print()
env.execute("job-name")
- ReducingState<T>:这种状态通过用户传入的reduceFunction,每次调用add()方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。
case class reducingStateFunction() extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
private var state: ReducingState[Long] = null
//业务逻辑根据自己的实际情况设计
override def flatMap(value: (Long, Long), out: Collector[(Long, Long)]): Unit = {
state.add(value._2)
out.collect(value)
}
override def open(parameters: Configuration): Unit = {
state = getRuntimeContext.getReducingState(new ReducingStateDescriptor[Long]("reducing-state-name", new ReduceFunction[Long] {
override def reduce(value1: Long, value2: Long): Long = {
value1 + value2
}
}, createTypeInformation[Long]))
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new reducingStateFunction())
.print()
env.execute("job-name")
- AggregatingState<IN,OUT>:这种状态值保留一个值,该值是添加的所有值的聚合。和ReducingState不同的是,聚合类型可能与添加到状态中的元素类型不同。
case class aggregatingStateFunction() extends RichMapFunction[(Long, Long), Long] {
private var state: AggregatingState[(Long, Long), Long] = null
//业务逻辑根据自己的实际情况设计
override def map(value: (Long, Long)): Long = {
state.add(value)
value._2
}
override def open(parameters: Configuration): Unit = {
state = getRuntimeContext.getAggregatingState(new AggregatingStateDescriptor[(Long, Long), (Long, Long), Long]("aggr-state-name", aggregateFunction, createTypeInformation[(Long, Long)]))
}
}
val aggregateFunction = new AggregateFunction[(Long, Long), (Long, Long), Long] {
override def createAccumulator(): (Long, Long) = {
(0L, 0L)
}
override def add(value: (Long, Long), accumulator: (Long, Long)): (Long, Long) = {
(accumulator._1 + 1, accumulator._2 + value._2)
}
override def getResult(accumulator: (Long, Long)): Long = {
val result = accumulator._1 / accumulator._2
result
}
override def merge(a: (Long, Long), b: (Long, Long)): (Long, Long) = {
(a._1 + b._1, a._2 + b._2)
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new aggregatingStateFunction())
.print()
env.execute("job-name")
- FoldingState<T,ACC>:跟ReducingState有点类似,不过它的状态值类型可以与add()方法中传入的元素类型不同。FoldingState在Flink-1.4版本已经被弃用,在未来会被完全废弃掉。可以使用AggregatingState代替之。
case class foldingStateFunction() extends RichMapFunction[(Long, Long), (Long, Long)] {
private var state: FoldingState[(Long, Long), Long] = null
//业务逻辑根据自己的实际情况设计
override def map(value: (Long, Long)): (Long, Long) = {
state.add(value)
value
}
override def open(parameters: Configuration): Unit = {
state = getRuntimeContext.getFoldingState(new FoldingStateDescriptor[(Long, Long), Long]("folding-state-name", 10L, new FoldFunction[(Long, Long), Long] {
override def fold(accumulator: Long, value: (Long, Long)): Long = {
accumulator + value._1 + value._2
}
}, createTypeInformation[Long]))
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new foldingStateFunction())
.print()
env.execute("job-name")
- MapState<UK,UV>:即状态值是一个map,用户可以通过put() 或者putAll()方法添加元素。
case class mapStateFunction() extends RichMapFunction[(Long, Long), ((Long, Long), Long)] {
private var state: MapState[(Long, Long), Long] = null
//业务逻辑根据自己的实际情况设计
override def map(value: (Long, Long)): ((Long, Long), Long) = {
state.put(value, value._1 + value._2)
(value, state.get(value))
}
override def open(parameters: Configuration): Unit = {
val keyMapState = createTypeInformation[(Long, Long)]
val valueMapState = createTypeInformation[Long]
state = getRuntimeContext.getMapState(new MapStateDescriptor[(Long, Long), Long]("map-state-name", keyMapState, valueMapState))
}
}
val input: DataStream[(Long, Long)] = ...
input.keyBy(_._1).flatMap(new mapStateFunction())
.print()
env.execute("job-name")
6-1.State生存周期
time-to-live(TTL)能够被使用在任何类型的keyed state。如果TTL被配置而且状态值已经过期,则将以最佳的方式清理。为了使用状态TTL,必须首先构建一个StateTtlConfig 对象。通过配置可以在任何state descriptor 中使用。
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
val stateDescriptor = new ValueStateDescriptor[String]("value_state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
配置中有下面几个配置项可以选择:StateTtlConfig中的newBuilder这个方法是必须的,它是设置生存周期的值。
- TTL 刷新策略(默认OnCreateAndWrite)
策略类型 | 描述 |
---|---|
StateTtlConfig.UpdateType.Disabled | 禁用TTL,永不过期 |
StateTtlConfig.UpdateType.OnCreateAndWrite | 每次写操作都会更新State的最后访问时间 |
StateTtlConfig.UpdateType.OnReadAndWrite | 每次读写操作都会跟新State的最后访问时间 |
- 状态可见性(默认NeverReturnExpired)
策略类型 | 描述 |
---|---|
StateTtlConfig.StateVisibility.NeverReturnExpired | 永不返回过期状态 |
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp | 可以返回过期但尚未被清理的状态值 |
Notes:
- 状态后端存储最后一次修改的时间戳和值,这意味着启用该特性会增加状态存储的消耗。堆状态后端在内存中存储一个附加的Java对象,其中包含对用户状态对象的引用和一个原始长值。RocksDB状态后端为每个存储值、列表条目或映射条目添加8个字节;
- 目前只支持与处理时间相关的TTLs;
- 如果试图使用启用TTL的描述符或使用启用TTL的描述符恢复先前在没有TTL的情况下配置的状态,将导致兼容性失败和statmigration异常;
- TTL配置不是check- or savepoints的一部分,而是Flink在当前运行的作业中如何处理它的一种方式
6-1-1.State清除策略
Cleanup in full snapshot↓
默认情况下,过期值只有在显式读出时才会被删除,例如通过调用ValueState.value()方法。此外,您可以在获取完整状态快照时激活清理操作,这将减少其大小。在当前实现下,本地状态不会被清除,但在从前一个快照恢复时,它不会包含已删除的过期状态。可以在StateTtlConfig中配置.
1:下面的配置选项不适用于RocksDB state backend上的increamental checkpointing:;
2:对于现有作业,此清理策略可以在StateTtlConfig中随时激活或停用,例如从保存点重新启动后可以使用。
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot
.build
Incremental cleanup↓
另一个选项是增量地触发对某些状态项的清理。触发器可以是来自每个状态访问或/和每个记录处理的回调。如果这个清理策略在某个状态下活跃的,那么存储后端会在其所有条目上为该状态保留一个惰性全局迭代器。每次触发增量清理时,迭代器都会被提升。检查遍历的状态项,并清理过期的状态项。
这个特性可以在StateTtlConfig中激活:
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlCon fig
.newBuilder(Time.seconds(1))
.cleanupIncrementally
.build
上面的策略有两个参数,第一个参数:第是每次清理触发的检查状态的条件。如果启用,则每次状态访问都将触发它。第二个参数:是否为每个记录处理额外触发清理。
Notes:
- 如果对状态没有访问或者没有任何处理的记录,那么状态会一直保留;
- 增量状态的清理增加了记录处理的延迟;
- 目前,增量状态的清理策略仅仅在对堆状态后端被实现了,对于设置了RocksDB的将没有效果;
- 如果使用堆状态后端进行同步快照,全局迭代器在跌倒时会保留所有键的副本,因为它的特性不支持对并发数的修改。使用此功能将增加内存消耗。异步快照进行对状态的保存就没有这种情况发生;
- 对于现有的作业,可以通过在StateTtlConfig中设置这种清理策略能够随时被激活和停用,例如:从保存点重新启动后。
Cleanup during RocksDB compaction↓
如果使用RocksDB进行状态的管理,另一个清理策略就是激活Flink的压缩过滤这个策略。RocksDB会定期使用异步压缩来合并状态的更新和减少储存。Flink压缩过滤器使用TTL检查状态的过期时间戳,并排除过期值。
默认情况下是关闭该特性的。对于RocksDB进行状态管理首先要做的就是要激活,通过Flink配置文件配置state.backend.rocksdb.ttl.compaction.filter.enabled,或者对于一个Flink job来说如果一个自定义的RocksDB 状态管理被创建那么它可以调用 RocksDBStateBackend::enableTtlCompactionFilter。然后任何带有TTL的状态都可以配置来去使用过滤器。
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter
.build
RocksDB compaction filter将会从Flink每次处理完一定数据量的状态之后,从Flink查询用于检查过期的当前时间戳,这个数字默认是1000。你也可以选择更改它,并将自定义值传递给StateTtlConfig.newBuilder(…)。cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)方法。频繁的跟新时间错可以提高清理的数据但是会降低压缩性能,因为它使用了来自本地的JNI的调用。
Notes:
- 在压缩过程中调用TTL过滤器会减慢它的速度。TTL过滤器必须解析上次访问的时间戳,并检查每个正在压缩的键的每个存储状态条目的过期时间。对于集合状态类型(列表或映射),每个存储的元素也调用该检查;
- 对于现有作业,此清理策略可以在StateTtlConfig中随时激活或停用,例如从保存点重新启动后。
目前,管理operator state仅仅支持使用List类型。当前,支持List样式的托管运算符状态,彼此之间相互独立,因此可以在重新缩放时可以重新分配。换句话说,这些对象是可以重新分配non-keyed state的最佳粒度。根据状态访问方法,定义一下重新分配方案。
6-2.State在scala DataStream API中的使用
除了上面描述的接口之外,Scala API还为有状态map()或flatMap()函数提供了快捷方式,这些函数在KeyedStream上只有一个ValueState。方法中在一个Option中获取ValueState的当前值,并且必须返回一个更新后的值,该值将用于更新状态。
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
7. Operator State在托管状态下使用
为了使用的operator state,有状态的函数可以通过CheckpointedFunction 接口或者ListCheckpointed<T extends Serializable>接口实现。
CheckpointedFunction↓
CheckpointedFunction 接口通过不同的重新分发方案提供对非键状态(no-keyed state)的访问。它需要实现下面两种方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
每当必须执行检查点时,都会调用SnapshotState()。每次初始化用户定义的函数时,都会调用对应的initializeState()方法,急当函数首次初始化时,或者当函数实际从早期的检查点恢复时。因此,initializeState() 方法不仅仅时初始化不同类型状态的地方,而且也是状态恢复逻辑的地方。
当前,托管的 operator state支持List 类型。状态应该是可序列化对象的List,彼此之间独立,因此在重新调节之后可以重新分配。换句话说,这些对象是non-keyed state重新分配的最佳粒度。根据状态访问方法,定义了一下重新分配的方案:
-
Even-split redistribution:每一个operator 返回一个状态元素的List。整个状态在逻辑上是所有列表的串联。在恢复或者重新分配时,这个List会被均匀划分为和operator并行度一样的子list(sublist)。每个operator获得一个sublist,它可能是为空或者包含一个或者多个元素。举个例子:如果并行度为1的operator的checkpointed state包含element1和element2 两个,当增减并行度到2时,element1可能在operator的实例0中,element2可能在operator的实例1中。
-
Union redistribution:*每个operator 返回一个状态元素的List。整个状态在逻辑上是所有列表的串联。在恢复或者重新分配时,每一个operator都将获得包含整个state elements的List。 *
下面是一个有状态的SinkFunction的例子,它使用CheckpointedFunction 在将元素发送到下游之前进将它们缓冲起来。它演示了even-split redistribution这个策略:
class BufferingSink(threshold: Int = 0)
extends SinkFunction[(String, Int)]
with CheckpointedFunction {
@transient
private var checkpointedState: ListState[(String, Int)] = _
private val bufferedElements = ListBuffer[(String, Int)]()
override def invoke(value: (String, Int), context: Context): Unit = {
bufferedElements += value
if (bufferedElements.size == threshold) {
for (element <- bufferedElements) {
// send it to the sink
}
bufferedElements.clear()
}
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Int)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
if(context.isRestored) {
for(element <- checkpointedState.get()) {
bufferedElements += element
}
}
}
}
initializeState ()方法中有一个FunctionInitializationContext参数。这个参数是用来初始化non-keyed state的“containers”。这有一个ListState类型的container,non-keyed state 的对象将会在进行检查点时保存到这。
注意怎么初始化这个状态,和keyed state类似,使用一个StateDescriptor 包含了状态的名字和所持有的状态值类型的信息:
val descriptor = new ListStateDescriptor[(String, Long)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Long)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
状态访问方法的命名约定包含其重新分配模式和状态结构。例如,当在还原的时候将list state和Union redistribution方案一起使用,通过使用getUnionListState(descriptor)获取state。如果方法名不包含重分配的模式,例如,getListState(descriptor),这将意味着使用基本的Even-split redistribution 方案。
初始化容器之后,我们使用FunctionInitializationContext的isRestored()方法检查当我们的任务失败后是否正在恢复。如果isRestored()方法返回的结果是true,说明正在恢复,则使用恢复逻辑。
如修改后的BufferingSink代码所示,状态初始化期间恢复的ListState保存在类变量中,以备将来在Snapshotstate()中使用。ListState被清除了前一个checkpoint包含的所有对象,然后被我们想要填充的新对象进行填充。
另一方面,keyed state可以在initializeState()方法中初始化。使用FunctionInitializationContext就可以完成。
override def initializeState(context: FunctionInitializationContext): Unit = {
try {
val descriptor: ListStateDescriptor[List[RealTimeContent]] = new ListStateDescriptor[List[RealTimeContent]]("content-sink", createTypeInformation[List[RealTimeContent]])
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
if (context.isRestored) {
val itt = checkpointedState.get().iterator()
while (itt.hasNext) {
val element = itt.next()
bufferElements += element
}
}
} catch {
case e: Exception => println(e.getMessage)
}
}
ListCheckpointed↓
ListCheckpointed() 接口是 CheckpointedFunction()有限的一个变体。它只支持List 类型的state 和 even-split redistribution 策略结合的恢复。它要求实现下面两个方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
在snapshotState()方法中,operator应该返回一个对象的列表给到checkpoint,restoreState ()方法则是在恢复中使用这个列表的数据。如果这个state是不可再分区的,你可以在snapshotState()一直返回一个 Collections.singletonList(MY_STATE)。
7-1.有State的Source Functions
与其它的operators相比,有状态的Sources需要更多的关注。为了对状态和输出集合原子性进行更新(要求故障恢复时的 exactly-once来说),则要求开发者从source的context中获取一个lock。
class CounterSource
extends RichParallelSourceFunction[Long]
with ListCheckpointed[Long] {
@volatile
private var isRunning = true
private var offset = 0L
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
val lock = ctx.getCheckpointLock
while (isRunning) {
// output and state update are atomic
lock.synchronized({
ctx.collect(offset)
offset += 1
})
}
}
override def cancel(): Unit = isRunning = false
override def restoreState(state: util.List[Long]): Unit =
for (s <- state) {
offset = s
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
Collections.singletonList(offset)
}