Flink学习指南玩转大数据Java

Flink 使用之状态和checkpoint

2021-11-30  本文已影响0人  AlienPaul

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

背景

通常Flink算子是无状态的,每个算子根据设定好的逻辑,依次对到来的数据进行加工。无论是第一次加工还是第一万次,逻辑全是一样的。

但是这种方式并不能满足全部需求。比如我们要在某个算子统计交易金额的平均值。每次计算都要依赖上一次计算的结果。这种算子称为有状态算子。

面对有状态算子,我们需要清楚:

接下来为大家介绍有状态算子的使用方式。

State Backend 状态后端

状态后端决定了Flink状态的存储方式和位置。按照存储方式划分为以下两种:

注:Rocksdb是一个持久性key-value存储,和Redis类似。不同的是,Redis使用场景更多的考虑内存存储,而Rocksdb则采用了LSM树,更多的考虑持久化存储的场景。

状态后端的配置

状态后端有如下两种配置方式,但是它们的作用范围不同:

启用checkpoint需要设置checkpoint的间隔时间。全局配置方式:

execution.checkpointing.interval: 10s

作业配置方式:

env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10))

使用hashmap作为state backend

全局配置:

# The backend that will be used to store operator state checkpoints
state.backend: hashmap

# Directory for storing checkpoints
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints

作业配置:

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStateBackend(new HashMapStateBackend())

使用Rocksdb作为state backend

全局配置:

state.backend: rocksdb
state.checkpoints.dir: hdfs://manager.bigdata:8020/flink_checkpoints

state.backend.incremental: True

配置项解释如下:

作业配置:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new EmbeddedRocksDBStateBackend)
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir")


// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
// setCheckpointStorage可接受FileSystemCheckpointStorage作为参数,从而支持更多的配置参数,例如写入缓存大小
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"))

注意:如果想在IDE本地调试Rocksdb backend,需要在Maven引入:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.14.0</version>
    <scope>provided</scope>
</dependency>

代码中使用State

上一节介绍了Flink State的后台保存方法,那么这里开始介绍下如何在代码中写入和读取state。

官网介绍参见Working with State | Apache Flink

KeyedState

使用KeyedState要求必须将DataStream转换为KeyedStream,方法为使用数据分区keyBy算子。

val source = env.fromElements(...)
source.keyBy(_ % 2)

KeyedState的范围为Key级别,意思是不同的key值对应的是不同的KeyedState

通常KeyedState在KeyedProcessFunction中使用。在open方法中获取到state变量,在processElement方法中读取或更新。

KeyedState按照底层存储的数据结构区分为如下5种:

在创建KeyedState之前,我们必须先构建对应类型的StateDescriptorStateDescriptor中定义了state的参数和行为,和state变量一一对应。StateDescriptor包含state变量的名称,保存的数据结构,以及其他信息(例如ReducingState需要在StateDescriptor额外指定ReduceFunction)。

下面我们逐个介绍这些state。

ValueState

var valueState: ValueState[Int] = _

override def open(parameters: Configuration): Unit = {
    // 获取符合条件的ValueState对象
    valueState = getRuntimeContext.getState(new ValueStateDescriptor[Int]("count", TypeInformation.of(classOf[Int])))
}
override def processElement(i: Int, context: KeyedProcessFunction[Int, Int, Int]#Context, collector: Collector[Int]): Unit = {
    // 更新valueState的值
    valueState.update(3)
}

ListState

var listState: ListState[Int] = _

override def open(parameters: Configuration): Unit = {
    // 获取符合条件的ListState对象
    listState = getRuntimeContext.getListState(new ListStateDescriptor[Int]("list", TypeInformation.of(classOf[Int])))
}
override def processElement(i: Int, context: KeyedProcessFunction[Int, Int, Int]#Context, collector: Collector[Int]): Unit = {
    // 为list中添加元素
    listState.add(3)
    // 获取整个list,例子中类型为Iterator[Int]
    listState.get()
}

ReducingState

ReducingStateDescriptor的创建需要额外指定ReduceFunction,用于指定旧元素和新元素的运算逻辑。例如:

override def open(parameters: Configuration): Unit = {
    value = getRuntimeContext.getReducingState(new ReducingStateDescriptor[Int]("reduce", new ReduceFunction[Int] {
        override def reduce(t: Int, t1: Int): Int = {
            // 这个例子实际上是累加
            t + t1
        }
    }, TypeInformation.of(classOf[Int])))
}

AggregatingState

为了区别ReducingState我们这里使用一个较为复杂的例子。

我们期望ReducingState每次摄入的数据类型为Int,输出类型为String,这个String包含了每次摄入的数据列表。例如依次插入1,3,5,输出为[1, 3, 5]。

创建AggregatingStateDescriptor需要指定3个参数:输入数据类型,累加器类型和输出数据类型。AggregatingState实际后台存储的数据类型为累加器类型。

数据的聚合逻辑由AggregateFunction定义,需要重写它的4个方法:

下面是代码和分析:

override def open(parameters: Configuration): Unit = {
    value = getRuntimeContext.getAggregatingState(new AggregatingStateDescriptor[Int, util.List[Int], String]("agg", new AggregateFunction[Int, util.List[Int], String] {
        // 累加器为java.util.ArrayList,这里创建出一个新的
        override def createAccumulator(): util.List[Int] = new util.ArrayList[Int]()

        override def add(in: Int, acc: util.List[Int]): util.List[Int] = {
            acc.add(in)
            acc
        }

        // 将保存有数据的List转换为String,作为输出
        override def getResult(acc: util.List[Int]): String = acc.toString

        override def merge(acc: util.List[Int], acc1: util.List[Int]): util.List[Int] = {
            acc.addAll(acc1)
            acc
        }
        // 第三个参数传入的是累加器类型
    } , TypeInformation.of(classOf[util.List[Int]])))
}

MapState

var mapState: MapState[String, Int] = _

override def open(parameters: Configuration): Unit = {
    // 获取符合条件的MapState对象
    mapState = getRuntimeContext.getMapState(new MapStateDescriptor[String, Int]("map", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int])))
}
override def processElement(i: Int, context: KeyedProcessFunction[Int, Int, Int]#Context, collector: Collector[Int]): Unit = {
    // 存入key value对到mapState
    mapState.put("A", 1)
    // 获取key对应的value
    mapState.get("A")
}

MapWithState & FlatMapWithState

如果我们使用Scala编程,可以使用有状态的mapflatMap算子,即mapWithStateflatMapWithState,目前状态数据类型仅支持ValueState`。一个例子如下:

// i为map输入,opt为状态变量类型
.mapWithState((i: Int, opt: Option[Int]) => opt match {
    // 如果状态变量存在
    case Some(c) =>
    // 逗号前为map逻辑,可读取状态变量
    // 逗号后为更新state变量逻辑
    (i + c, Some(i + c))
    // 如果状态变量不存在
    // 这个例子中初始化状态变量值
    case None =>
    (i, Some(i))
})

State TTL

可以通过StateTtlConfig设置状态变量的生存时间,避免过期无效状态的积压导致Out of Memory问题。

如果采用了State TTL,那么变量在状态后端需要额外保存一个上次访问时间,会多消耗8字节空间。

启用了TTL的状态变量,当用户对它操作的时候,会检查上次访问时间+TTL时间是否超过当前时间,如果超过,变量视为已过期。如果变量未过期,成功访问之后,按照实现配置的updateType,更新上次访问时间(目前支持创建和写入时候更新,或者是创建读取写入的时候都更新)。

配置状态变量的TTL需要StateTtlConfig,它并非全局配置,而是变量级别的配置。因此我们可以为不同的变量配置不同的过期策略。

目前StateTtlConfig只支持processing time作为TTL。

一个典型的使用场景如下:

val ttlConfig = StateTtlConfig
    // 距离上次update多久之后视为状态过期
    .newBuilder(Time.seconds(1))
    // 只有创建和写入数据的时候才更新上次访问时间
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    // 不返回过期的数据
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build

val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

配置说明:

BroadcastState

我们在使用BroadcastStream的时候也可以指定一个MapState。它由BroadcastStream携带保存,可以在BroadcastProcessFunctionprocessBroadcastElement方法中更新,在processElement中访问。

//BroadcastState只能使用MapState
val mapState = new MapStateDescriptor[String, Int]("map", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))
val broadcasted = source.broadcast(mapState)

anotherStream.connect(broadcasted).process(new BroadcastProcessFunction[Int, Int, Int] {
    override def processElement(in1: Int, readOnlyContext: BroadcastProcessFunction[Int, Int, Int]#ReadOnlyContext, collector: Collector[Int]): Unit = {
        // 在这里只能访问broadcast state
        val state = readOnlyContext.getBroadcastState(mapState)
    }

    override def processBroadcastElement(in2: Int, context: BroadcastProcessFunction[Int, Int, Int]#Context, collector: Collector[Int]): Unit = {
        // 在这里可以访问和更新broadcast state
        val state = context.getBroadcastState(mapState)
        state.put(k, v)
    }
})

Operator State

KeyedState不同,OperatorState保存的状态为Operator级别,同一个operator中的所有key(key对应不同的channel)共享同一个状态。

要使用Operator State,我们的处理function必须要实现CheckpointedFunction,并编写snapshotStateinitializeState逻辑。详细用法和例子参见Flink 使用之数据源CheckpointedFunction章节。

除此之外还可以使用ListCheckpointed接口,但是目前版本已经标记为deprecated,不推荐使用。

Checkpoint优化

Buffer Debloating

Flink 1.14新增的优化方式。Buffer Debloating能够自动计算和控制in-flight data(operator输入队列和输出队列缓存的数据)大小。从而减少checkpoint耗时,减少checkpoint存储大小和恢复时间(因为in-flight data的量减少了)。对于Unaligned Checkpoint效果更为显著。

Buffer Debloating通过如下配置指标来动态计算in-flight data缓存的大小:

官方文档:

启用方式:

taskmanager.network.memory.buffer-debloat.enabled: true

Unaligned Checkpoint

对于拥有多个input的operator,Unaligned Checkpoint不必等待接收到所有input的watermark,不需要阻塞input,降低了checkpoint对系统性能的影响,大大减少了数据的延迟。但会增加checkpoint的数据量。

Unaligned Checkpoint的详细分析参见Flink 源码之 1.11新特性Unaligned checkpoint

全局配置:

execution.checkpointing.unaligned: true
// 配置Aligned checkpoint的超时时间
execution.checkpointing.aligned-checkpoint-timeout: 30 s

注:execution.checkpointing.aligned-checkpoint-timeout必须在启用unaligned的时候才生效。该配置项如果值为0,则任何时候都使用Unaligned Checkpoint。如果值大于0,则首先尝试使用aligned checkpoint,如果在配置时间范围内checkpoint barrier仍没有对齐,则切换为unaligned checkpoint。

作业配置:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// enables the unaligned checkpoints
env.getCheckpointConfig.enableUnalignedCheckpoints()
env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30));
上一篇 下一篇

猜你喜欢

热点阅读