Flink的状态管理和容错(上)
2021-08-29 本文已影响0人
xiao_xian
1.有状态计算概念
Storm | Flink |
---|---|
计算层跟存储层分离,需要依赖Redis、HBase等外部数据库存储状态 | Compute+Application State,状态由Flink内部进行管理,保存在内存、RockDB,定期checkpoint到文件系统 |
网络耗时长,无法做到exactly one,重启无法保存状态,算子blocking在网络等待中,吞吐率低 | 内存消耗大 |
有状态流处理架构 需考虑的问题点?
- 数据一致性
- 系统的宕机时中间数据的丢失
- 版本升级,代码逻辑发生变化,状态需要装换
- 内存大小有限,无法存储过多数据
2.状态类型及应用
状态类型Keyed State | Operator State | |
---|---|---|
使用算子类型 | 只用于KeyedStream中的Operator | 所有的Operator |
状态分配 | 每个 Key 对应一个状态,单个Operator 中可能涵盖多个 Keys | 单个 Operator 对应一个 State |
创建和访问方式 | 重写RichFunctionn,通过RuntimeContext对象获取 | 实现checkpointedFunction 或ListCheckpointed接口 |
横向扩展 | 状态随着 Key 自动在多个算子 Task上迁移 | 有多种状态重新分配的方式: - 均匀分配 - 将所有状态合并,再分发给每个实例上 |
支持数据类型 | ValueState ListState ReducingState AggregatingState MapState |
ListState BroadcastState |
3.Keyed State
ValueState的简单应用
- 继承RichMapFunction(或对应RichFlatMapFunction,...)
- open方法创建状态描述符,通过RutimeContext创建对应的状态
- 通过value()获取状态值,update(xxx)更新状态值
例子:
// extends RichMapFunction
class MapWithCounterFunction extends RichMapFunction<Tuple2<String, String>, Long> {
private ValueState<Long> totalLengthByKey; // 状态
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<Long> stateDescriptor = // 定义状态描述符,用于创建状态
new ValueStateDescriptor<Long>("sum of length", LongSerializer.INSTANCE);
totalLengthByKey = getRuntimeContext().getState(stateDescriptor); // 通过RuntimeContext创建,注册状态
}
@Override
public Long map(Tuple2<String, String> value) throws Exception {
Long length = totalLengthByKey.value(); // 获取状态值
if (length == null) {
length = 0L;
}
long newTotalLength = length + value.f1.length();
totalLengthByKey.update(newTotalLength); // 设置状态值
return newTotalLength;
}
}
4. Operator State
定义:
- 单Operator具有一个状态,不区分Key
- State需要支持重新分布
- 常用于Source和Sink节点,像KafkaConsumer中,维护Offset,Topic等信息
类型:
- ListState:所有的状态后端元素平均分配给新的tasks
- UnionListState:所有的状态后端元素都会分配给新的tasks
- BroadcastState:所有的tasks都保存有所有广播的状态后端
两种定义方式:、
- 实现 CheckpointedFunction 接口定义
- 实现 ListCheckpointed 接口定义 (Deprecated)
例子:
// 1.实现CheckpointedFunction
static class CustomMapFunction<T> implements MapFunction<T, T>, CheckpointedFunction {
private ReducingState<Long> countPerKey;
private ListState<Long> countPerPartition; // ListState类型,所有算子公用一个
private long localCount;
@Override
public void initializeState(FunctionInitializationContext context) throws Exception { // 初始化,异常自动恢复时调用
countPerKey = context.getKeyedStateStore().getReducingState(
new ReducingStateDescriptor<>("perKeyCount", new AddFunction(), Long.class));
countPerPartition = context.getOperatorStateStore().getUnionListState(
new ListStateDescriptor<>("perPartitionCount", Long.class)); // 2.定义状态描述符 3.通过context注册状态后端
for (Long l : countPerPartition.get()) {
localCount += l;
};
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
countPerPartition.clear();
countPerPartition.add(localCount);
}
@Override
public T map(T value) throws Exception {
countPerKey.add(1L); // 4.使用状态后端
localCount++;
return value;
}
}
static class AddFunction implements ReduceFunction<Long> {
@Override
public Long reduce(Long aLong, Long t1) throws Exception {
return aLong + t1;
}
}
5. Broadcast State
- Broadcast State 使得 Flink 用户能够以容错、一致、可扩缩容地将来自广播的低吞吐的事件流数据存储下来,被广播到某个 operator 的所有并发实例中,然后与另一条流数据连接进行计算
Broadcast State与其他operator state区别
- Broadcast State是Map格式类型,通过MapStateDescriptor获取
- 需要有一条广播的输入流
- operator可以有多个不同名称的广播状态
应用场景
- 动态规则:例如,当一个报警规则时触发报警信息等,将规则广播到算子的所有并发实例中
例子:
public class CartDetectPatternEvaluatorExample {
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool);
// 行为事件输入流(非广播流)
DataStream<Action> actions = env
.addSource(
new FlinkKafkaConsumer010<>(
parameterTool.getRequired("action-topic"),
new ActionEventSchema(),
parameterTool.getProperties()));
// 规则输入流(非广播流)
DataStream<Pattern> patterns = env
.addSource(
new FlinkKafkaConsumer010<>(
parameterTool.getRequired("pattern-topic"),
new PatternEventSchema(),
parameterTool.getProperties()));
// 使用userId进行keyBy
KeyedStream<Action, Long> actionsByUser = actions
.keyBy((KeySelector<Action, Long>) action -> action.userId);
// 创建Map类型的状态描述符
MapStateDescriptor<Void, Pattern> bcStateDescriptor =
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
// 非广播流调用broadcast方法装换为广播流
BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor);
DataStream<Tuple2<Long, Pattern>> matches = actionsByUser
.connect(bcedPatterns) // 非广播流调用connect与广播流进行关联
.process(new PatternEvaluator()); // process一个KeyedBroadcastProcessFunction
matches.print();
env.execute("CartDetectPatternEvaluatorExample");
}
// KeyedBroadcastProcessFunction 中的类型参数表示:
// 1. key stream 中的 key 类型
// 2. 非广播流中的元素类型
// 3. 广播流中的元素类型
// 4. 输出结果的类型
public static class PatternEvaluator
extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {
// handle for keyed state (per user)
ValueState<String> prevActionState;
// broadcast state descriptor
MapStateDescriptor<Void, Pattern> patternDesc; // 通过该描述符获取状态
@Override
public void open(Configuration conf) {
// initialize keyed state
prevActionState = getRuntimeContext().getState(
new ValueStateDescriptor<>("lastAction", Types.STRING));
// 跟上面的定义一样
patternDesc =
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
}
/**
* Called for each user action.
* Evaluates the current pattern against the previous and
* current action of the user.
*/
@Override
public void processElement(
Action action,
ReadOnlyContext ctx, // 获取的状态只读不能修改,为了保证所有task的状态一致
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// get current pattern from broadcast state
Pattern pattern = ctx
.getBroadcastState(this.patternDesc)
// access MapState with null as VOID default value
.get(null);
// get previous action of current user from keyed state
String prevAction = prevActionState.value();
if (pattern != null && prevAction != null) {
// user had an action before, check if pattern matches
if (pattern.firstAction.equals(prevAction) &&
pattern.secondAction.equals(action.action)) {
// MATCH
out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
}
}
// update keyed state and remember action for next pattern evaluation
prevActionState.update(action.action);
}
/**
* Called for each new pattern.
* Overwrites the current pattern with the new pattern.
*/
@Override
public void processBroadcastElement(
Pattern pattern,
Context ctx, // 获取的状态有读写权限,state改变后会被同步给其他的算子
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// store the new pattern by updating the broadcast state
BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
// storing in MapState with null as VOID default value
bcState.put(null, pattern);
}
}
}
注意事项
- processBroadcastElement获取的state只有读权限;
- Broadcast state暂不支持RocksDB,只支持内存