[Flink原理]-一文入门Flink中对状态的管理
概述:
状态作为流计算的核心属性,Flink针对状态做了很多的处理,即你可以将中间的计算结果进行保存,并提供给后续的计算使用。
分类:
- KeyState:
- ValueState
- ListState
- ReducingState
- AggregatingState
- FoldingState
- MapState
- OperatorState
- ListState
- UnionListState
- BroadcastState
原始状态和托管状态:
Keyed State和Operator State,可以以两种形式存在:原始状态和托管状态。
- raw state: 原始状态
- managed state Flink内部托管
托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。
而raw state即原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。
通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。
状态后端:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
代码示例:
KeyedStateCustom:
package state;
import com.google.common.collect.Lists;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
/**
* 自定义KeyedState
*
* @author lixiyan
* @date 2020/4/5 8:34 PM
*/
public class KeyedStateCustom extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {
private ListState<Long> abnormalData;
// 需要监控的阈值
private Long threshold;
// 触发报警的次数
private Integer numberOfTimes;
KeyedStateCustom(Long threshold, Integer numberOfTimes) {
this.threshold = threshold;
this.numberOfTimes = numberOfTimes;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StateTtlConfig ttl = StateTtlConfig
// 设置有效期为 1 秒
.newBuilder(Time.seconds(1))
// 设置有效期更新规则,这里设置为当创建和写入时,都重置其有效期到规定的10秒
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
/*设置只要值过期就不可见,另外一个可选值是ReturnExpiredIfNotCleanedUp,
代表即使值过期了,但如果还没有被物理删除,就是可见的*/
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ListStateDescriptor<Long> abnormalDataDesc = new ListStateDescriptor<>("abnormalData", Long.class);
abnormalDataDesc.enableTimeToLive(ttl);
this.abnormalData = getRuntimeContext().getListState(abnormalDataDesc);
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) throws Exception {
Long inputValue = value.f1;
// 如果超过阀值,记录不正常信息
if (inputValue >= threshold) {
abnormalData.add(inputValue);
}
ArrayList<Long> lists = Lists.newArrayList(abnormalData.get().iterator());
// 如果不正常的数据出现一定次数,则输出报警信息
if (lists.size() >= numberOfTimes) {
out.collect(Tuple2.of(value.f0 + "超过指定阈值", lists));
// 输出后清空状态
abnormalData.clear();
}
}
}
keyedState
主要继承了RichFlatMapFunction
,从而能从RuntimeContext中获取和更state改状态,这里还在open中加入了ttl过期时间的设置,在实际的生产中,很容易因为状态不清理,或者状态延时堆积导致系统崩溃,所以就需要我们通过ttl配置,设置最大过期时间,当到达最大过期时间还没有处理,状态就会被清空。
KeyedState测试类:
package state;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 测试类
*
* @author lixiyan
* @date 2020/4/5 9:01 PM
*/
public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(
Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 40L));
tuple2DataStreamSource.keyBy(0).flatMap(new KeyedStateCustom( 100L,3)).uid("keyed").print();
env.execute("keyed State");
}
}
结果演示:
结果演示从输出中我们看到记录了3个状态的输出,达到我们预期效果。
OperatorStateCustom
package state;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
/**
* 自定义operator
*
* @author lixiyan
* @date 2020/4/5 9:11 PM
*/
public class OperatorStateCustom extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Tuple2<String, Long>>>> implements CheckpointedFunction {
// 非正常数据
private List<Tuple2<String, Long>> bufferedData;
// checkPointedState
private transient ListState<Tuple2<String, Long>> checkPointedState;
// 需要监控的阈值
private Long threshold;
// 次数
private Integer numberOfTimes;
OperatorStateCustom(Long threshold, Integer numberOfTimes) {
this.threshold = threshold;
this.numberOfTimes = numberOfTimes;
this.bufferedData = new ArrayList<>();
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) throws Exception {
Long inputVal = value.f1;
// 超过阀值记录
if (inputVal > threshold){
bufferedData.add(value);
}
// 超过指定次数则输出报警信息
if (bufferedData.size()>= numberOfTimes){
// 输出状态实例的hashcode
out.collect(Tuple2.of(checkPointedState.hashCode()+" 超过阀值",bufferedData));
bufferedData.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 在进行快照时,将数据存储到checkPointedState
checkPointedState.clear();
System.out.println("snapshotState");
for (Tuple2<String, Long> element : bufferedData) {
System.out.println(element);
checkPointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 注意这里获取的是OperatorStateStore
checkPointedState = context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
})));
System.out.println("initializeState:"+checkPointedState.get());
// 如果发生重启,则需要从快照中将状态进行恢复
if (context.isRestored()){
System.out.println("aaaaaa");
for (Tuple2<String, Long> element : checkPointedState.get()) {
bufferedData.add(element);
}
}
}
}
operatorState在实现的不同之处在于,还需要实现CheckpointedFunction
接口,接口中我们主要实现两个方法initializeState
方法,负责初始化状态,在这里我们可以获取我们的状态,snapshotState
方法主要用于在状态即将写入checkpoint时,将数据存入状态中,从而写入checkpoint数据,当配置不开启checkpoint不会执行此方法。
operator state测试类:
package state;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 测试operator state
*
* @author lixiyan
* @date 2020/4/5 10:35 PM
*/
public class Main2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1);
//设置statebackend
env.setStateBackend(new FsStateBackend("file:///Users/lionli/lixiyan/flink/flink_practice/flink/src/main/resources/"));
CheckpointConfig config = env.getCheckpointConfig();
// 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置checkpoint的周期, 每隔100 ms进行启动一个检查点
config.setCheckpointInterval(100);
// 设置模式为exactly-once
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
config.setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
config.setMaxConcurrentCheckpoints(1);
// 执行逻辑
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(
Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
tuple2DataStreamSource.flatMap(new OperatorStateCustom(100L, 3)).printToErr();
env.execute("Operator State");
}
}
测试结果
可以看到输出了对应的数据
区别:
通过上面的结果我们可以发现keyed state和operator state的区别,operator state更像多个keyed state的集合,因为一个operator上可能会有多个key。
keyed state
keyed状态Operator State
operator 状态总结:
本篇文章主要介绍了在Flink中的状态管理,分为keyed state和operator state,并且针对这两种状态进行了实例演示,在实际的开发中可以根据自己实际情况进行合适的选择。
本文由博客群发一文多发等运营工具平台 OpenWrite 发布