Flink--State & Fault Tolerance-官
- 基于flink-1.8.1
- 基于官网
概念解析
- Snapshot 快照 - 一个通用术语,指的是Flink作业状态的全局一致图像。快照包括指向每个数据源的指针(例如,到文件或Kafka分区的偏移量),以及每个作业的有状态运算符的状态副本,这些状态是由处理完所有事件产生的。来源中的那些职位。
- Checkpoint 检查点 - Flink自动拍摄的快照,目的是能够从故障中恢复。检查点可以是增量的,并且经过优化可以快速恢复。
- Externalized Checkpoint 外部化检查点 - 通常检查点不应由用户操纵。 Flink在作业运行时仅保留n个最新的检查点(n可配置),并在作业取消时删除它们。但您可以将它们配置为保留,在这种情况下,您可以手动从它们恢复。
- Savepoint 保存点 -- 由用户(或API调用)手动触发的快照,用于某些操作目的,例如有状态重新部署/升级/重新缩放操作。保存点始终完整,并针对操作灵活性进行了优化。
- From:flink-snapshots
Keyed State and Operator State
- 在flink中有两种基本的状态:
Keyed State and Operator State
Keyed State
- keyed state始终与key相关,只能在KeyedStream上的函数和算子中使用。
- 可以将keyed state视为已经分区或者分片的Operator State,每个key有且只有一个状态分区(state-partition)。每一个keyed-state 在逻辑上绑定到一个唯一的组合型数据结构
<parallel-operator-instance, key>
,并且每一个key只属于一个keyed operator的并行化实例。可以将其简单地视为<operator, key>
; - keyed state 进一步组成了 key groups,key groups是flink可以重新划分keyed state的最小的原子单位;key groups的最大并行度和定义的最大并行度完全一致。在一个keyed operator并行化实例执行期间,实例中的keys有一个或者多个key groups。
Operator State
- 每个Operator State (or non-keyed state)绑定到一个并行算子实例中。Kafka Connector是在Flink中使用Operator State的一个很好的案例。 Kafka 消费者的每个并行实例都将topics分区和偏移的映射维护为Operator State 。
- 在 Operator State中,当算子的并行度发生改变时, Operator State接口支持在各算子实例中重分配,重分配的方案有很多中。
Raw and Managed State 原始和受管理的state
- Keyed State and Operator State exist in two forms: managed and raw.
- managed state由Flink runtime控制的数据结构表示,例如内部hash table或RocksDB。 例如“ValueState”,“ListState”等。Flink runtime对状态进行编码并将它们写入检查点。
- raw state是算子保留在自己的数据结构中的状态。 当checkpoint触发后,算子只会将state序列化后写入checkpoint。 Flink对state的数据结构一无所知,只看到原始字节。
- 所有的dataframe 函数都可以使用managed state,raw state只能被实现它们的算子使用。推荐使用managed state,因为在managed state下,Flink能够在并行性更改时自动重新分配状态,并且还可以进行更好的内存管理。
- Attention:if your managed state needs custom serialization logic, please see the corresponding guide in order to ensure future compatibility. Flink’s default serializers don’t need special treatment.
使用Managed Keyed State
- managed keyed state提供对不同类型的state的状态访问,这些state限定为当前元素输入的key。这疑问着state的可以只能在keyed stream上使用。
- 存在的state类型为:
- ValueState<T>:保留了一个可以更新和查询的值(如上所述,作用于输入元素的键的范围,因此操作看到的每个键可能有一个值)。 可以使用update(T)设置该值,并使用T value()查询该值。
- ListState<T>:保留了元素列表(list)。 可以追加元素并在所有当前存储的元素上查询Iterable(元素列表中的所有list)。 使用add(T)或addAll(List <T>)添加元素,可以使用Iterable <T> get()查询Iterable。 您还可以使用update覆盖现有列表(List <T>);
- ReducingState<T>:仅保留了一个值,表示添加到状态的所有值的聚合。 该接口类似于ListState,但使用add(T)添加的元素使用指定的ReduceFunction进行聚合。
- AggregatingState<IN, OUT>:保留了一个值,表示添加到状态的所有值的聚合。 与ReducingState相反,聚合类型可能与添加到状态的元素类型不同。 接口与ListState相同,但使用add(IN)添加的元素使用指定的AggregateFunction进行聚合。
- **FoldingState<T, ACC>: **保留了一个值,表示添加到状态的所有值的聚合。 与ReducingState相反,聚合类型可能与添加到状态的元素类型不同。 该接口类似于ListState,但使用add(T)添加的元素使用指定的FoldFunction折叠为聚合。
- **MapState<UK, UV>: **保留映射列表。 您可以将键值对放入状态,并在所有当前存储的映射上检索Iterable。 使用put(UK,UV)或putAll(Map <UK,UV>)添加映射。 可以使用get(UK)检索与用户密钥关联的值。 可以分别使用entries(),keys()和values()来查询映射,键和值的iterable view。
- 重要的是要记住,这些状态对象仅用于与状态接口。 状态不一定存储在内部,但可能驻留在磁盘或其他位置。 要记住的第二件事是,从状态获得的值取决于input元素的键。 因此,如果涉及的键不同,则在一次调用用户函数时获得的值可能与另一次调用中的值不同。
- 要获取状态句柄,必须创建StateDescriptor。 这保存了状态的名称(正如我们稍后将看到的,可以创建多个状态,并且它们必须具有唯一的名称以便您可以引用它们),状态所持有的值的类型,并且可能是用户 - 指定的函数,例如ReduceFunction。 根据要检索的状态类型,可以创建ValueStateDescriptor,ListStateDescriptor,ReducingStateDescriptor,FoldingStateDescriptor或MapStateDescriptor。
- State is accessed using the RuntimeContext, so it is only possible in rich functions。
- ValueState<T> getState(ValueStateDescriptor<T>)
- ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
- ListState<T> getListState(ListStateDescriptor<T>)
- AggregatingState<IN, OUT>
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>) - FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
- MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
- demo
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
- scala
(input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
)
}
}
object ExampleCountWindowAverage extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(List(
(1L, 3L),
(1L, 5L),
(1L, 7L),
(1L, 4L),
(1L, 2L)
)).keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// the printed output will be (1,4) and (1,5)
env.execute("ExampleManagedState")
}
State Time-To-Live (TTL)
- 可以将生存时间(TTL)分配给任何类型的keyed state。 如果配置了TTL并且状态值已过期,则将尽力清除存储的值,这将在下面更详细地讨论。
- 所有状态集合类型都支持每个条目的TTL。 这意味着列表元素和映射条目将独立到期。
- 为了使用状态TTL,必须首先构建StateTtlConfig配置对象。 然后,可以通过传递配置在任何状态描述符中启用TTL功能:
- demo
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
scala
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
-
主要参数如下:
-
The first parameter of the newBuilder method is mandatory, it is the time-to-live value.
-
The update type configures when the state TTL is refreshed (by default OnCreateAndWrite):
- StateTtlConfig.UpdateType.OnCreateAndWrite - only on creation and write access
- StateTtlConfig.UpdateType.OnReadAndWrite - also on read access
-
The state visibility configures whether the expired value is returned on read access if it is not cleaned up yet (by default NeverReturnExpired):
- StateTtlConfig.StateVisibility.NeverReturnExpired - expired value is never returned
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - returned if still available
-
在NeverReturnExpired的情况下,过期状态表现得好像它不再存在,即使它仍然必须被删除。 该选项对于在TTL之后必须严格用于读取访问的数据的用例是有用的,例如, 应用程序使用隐私敏感数据。
-
另一个选项ReturnExpiredIfNotCleanedUp允许在清理之前返回过期状态。
-
Note:
-
The state backends store the timestamp of the last modification along with the user value, which means that enabling this feature increases consumption of state storage. Heap state backend stores an additional Java object with a reference to the user state object and a primitive long value in memory. The RocksDB state backend adds 8 bytes per stored value, list entry or map entry.
-
Only TTLs in reference to processing time are currently supported.仅仅在process time下才支持TTL
-
Trying to restore state, which was previously configured without TTL, using TTL enabled descriptor or vice versa will lead to compatibility failure and StateMigrationException.
-
The TTL configuration is not part of check- or savepoints but rather a way of how Flink treats it in the currently running job.
-
The map state with TTL currently supports null user values only if the user value serializer can handle null values. If the serializer does not support null values, it can be wrapped with NullableSerializer at the cost of an extra byte in the serialized form.
Cleanup of Expired State
- 还是阅读一下官网吧,感觉用的不多;