译:Flink---状态

2019-02-13  本文已影响0人  雪味伦调

Flink 1.7 Google翻译

键控状态和操作状态


Flink中有两种基本的状态类型:键控状态和运算符状态

键控状态

键控状态总是与键相关并且只能在键控流的函数和运算符中使用

你可以将Keyed State视为已分区或分片的操作符状态,每个键只有一个状态分区。每个键控状态在逻辑上绑定到<parallel-operator-instance,key>的唯一复合,并且由于每个键“属于”键控运算符的一个并行实例,我们可以将其简单地视为<operator,key >

键控状态进一步组织成所谓的键控组。键控组是Flink可以重新分配密钥状态的原子单元;键控组与定义的最大并行度完全一样多。在执行期间,键控运算符的每个并行实例都使用一个或多个键控组的键。

运算符状态

使用运算符状态(或非键控状态), 每个运算符状态绑定到一个运算符实例。Kafka连接器在Flink中是一个使用运算符状态的好例子。任一个Kafka消费者并行实例都包含一个主题与偏移映射作为运算符状态。

运算符状态接口支持当并行度改变时通过并行操作符重新分配实例。进行重新分配有多种不同的方案

原生及托管状态


键控状态及运算符状态以两种方式存在:托管状态及原生状态

托管状态由Flink运行时控制的数据结构表示,例如内部的hash表,或者RocksDB.例如“ValueState”,"ListState"等。Flink将state编码并写入checkpoints中

原生状态是把运算符保存在自己数据结构中的状态。当状态检查时,它们仅将字节序列写入到checkpoint。Flink不感知状态的数据结构,它只能看到原生的字节数据

所有的datastream函数都可以使用托管状态,但是原生状态仅能在实现运算符时使用。建议使用托管状态而非原生状态,因为在托管状态下,当并行度改变时Flink可以自动重新分配状态,也可以更好的做内存管理。

注意:如果你的托管状态需要自定义序列化逻辑,请查看相关引导以保证未来的兼容性。Flink的默认序列化不需要特殊对待

使用托管监控状态


监控托管转台接口提供不同的状态类型的访问,这些状态都限定为当前输入元素的键。这意味着这些状态类型只能在监控流中使用,监控流使用stream.keyBy()来创建
现在,我们首先来看一下可用的不同状态类型,之后我们可以看到如何在程序中使用它们。可用的原生状态:

所有类型的状态还有一个方法clear(),它清除当前活动键的状态,即输入元素的键

注意: FoldingState和FoldingStateDescriptor在Flink 1.4中废弃了并且将在未来完全删除。请使用AggregatingState和AggregatingStateDescriptor

重要的是要记住,这些状态对象仅用于与状态接口。状态不一定存储在内部,但可能驻留在磁盘或其他位置。要记住的第二件事是,从状态获得的值取决于input元素的键。因此,如果涉及的键不同,则在一次调用用户函数时获得的值可能与另一次调用中的值不同

要获取状态句柄,您必须创建StateDescriptor。这保存了状态的名称(正如我们稍后将看到的,您可以创建多个状态,并且它们必须具有唯一的名称以便您可以引用它们),状态所持有的值的类型,并且可能是用户 - 指定的函数,例如ReduceFunction。根据要检索的状态类型,创建ValueStateDescriptor,ListStateDescriptor,ReducingStateDescriptor,FoldingStateDescriptor或MapStateDescriptor

使用RuntimeContext访问状态,因此只能在丰富的函数中使用。请参阅此处了解相关信息,但我们很快也会看到一个示例。 RichFunction中可用的RuntimeContext具有这些访问状态的方法:

这是一个示例FlatMapFunction,显示所有部件如何组合在一起:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

这个例子实现了一个穷人的计数窗口。我们通过第一个字段键入元组(在示例中都具有相同的键1)。该函数将计数和运行总和存储在ValueState中。一旦计数达到2,它将发出平均值并清除状态,以便我们从0开始。注意,如果我们在第一个字段中具有不同值的元组,这将为每个不同的输入键保持不同的状态值

TTL状态


可以将生存时间(TTL)分配给任何类型的键控状态。如果配置了TTL并且状态值已过期,则将尽力清除存储的值,这将在下面更详细地讨论。

所有状态集合类型都支持每个条目的TTL。这意味着列表元素和映射条目将独立过期。

为了使用状态TTL,必须首先构建StateTtlConfig配置对象。然后,可以通过传递配置在任何状态描述符中启用TTL功能

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

它的配置项有一下几点需要考虑:

第一个参数newBuilder是强制性的,指定value有效期

更新类型配置为TTL刷新的时机(默认是OnCreateAndWrite)

状态可见性配置如果过期值尚未清除,是否在读取访问时返回它(默认情况下NeverReturnExpired)

在NeverReturnExpired的情况下,过期状态表现得好像它不再存在,即使它仍然必须被删除。
该选项对于在TTL之后必须严格用于读取访问的数据的用例是有用的,例如,应用程序使用隐私敏感数据

另一个选项ReturnExpiredIfNotCleanedUp允许返回过期状态当它没有被清除时

注意:

清除过期状态

当前,仅在过期值独立读取时才可以移除,比如调用ValueState.value()

注意:这意味着默认情况下,如果未读取过期状态,则不会将其删除,从而可能导致状态不断增长。这可能在将来的版本中发生变化

此外,您可以在获取完整状态快照时激活清理,这将减小其大小。在当前实现下不清除本地状态,但是在从先前快照恢复的情况下它
不会包括已移除的过期状态。它可以在StateTtlConfig中配置

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

该操作不支持在RocksDB状态后端中增量的checkpointing,更多的在后台自动清理过期状态的策略将在未来加入

使用托管运算符状态

要使用托管操作符状态,有状态函数可以实现更通用的CheckpointedFunction接口,或者ListCheckpointed <T extends Serializable>接口

CheckpointedFunction

CheckpointedFunction 接口提供对具有不同重新分发方案的非键控状态的访问,它需要实现以下两个方法

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

每当必须执行检查点时,都会调用snapshotState()。每次初始化用户定义的函数时,都会调用对应的initializeState(),即首次初始化函数时,或者当函数实际从早期检查点恢复时。鉴于此,initializeState()不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑的位置

目前,支持列表样式的托管操作符状态。该状态应该是一个可序列化对象的列表,彼此独立,因此有资格在重新缩放时重新分配。换句话说,这些对象是可以重新分配非键控状态的最精细的粒度。根据状态访问方法,定义了以下重新分发方案

下面是一个有状态SinkFunction的示例,它使用CheckpointedFunction缓冲元素,然后再将它们发送到外部世界。它演示了基本的 even-split再分配列表状态

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

initializeState方法将FunctionInitializationContext作为参数。这用于初始化非键控状态“容器”。
这些是ListState类型的容器,其中非键控状态对象将在检查点存储

注意状态是如何初始化的,类似于键控状态,StateDescriptor包含状态名称和有关状态所持有的值类型的信息

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

状态访问方法的命名约定包含其重新分发模式,后跟其状态结构.例如,要在还原时使用联合重新分发方案的列表状态,请使用getUnionListState(descriptor)访问该状态。如果方法名称不包含重新分发模式,例如getListState(描述符),它只是意味着将使用基本的even-split再分配方案

在初始化容器之后,我们使用上下文的isRestored()方法来检查我们是否在失败后恢复。如果这是真的,即我们正在恢复,则应用恢复逻辑

如修改后的BufferingSink的代码所示,在状态初始化期间恢复的ListState保存在类变量中以供将来在snapshotState()中使用。在那里,ListState被清除前一个检查点包含的所有对象,然后填充我们想要检查点的新对象。

作为旁注,键控状态也可以在initializeState()方法中初始化。这可以使用FunctionInitializationContext提供的方式完成

ListCheckpointed

ListCheckpointed接口是CheckpointedFunction的一个更有限的变体,它仅支持在恢复时具有even-split再分配方案的列表样式状态。
它还需要实现两种方法

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

在snapshotState()上,运算符应该返回检查点的对象列表,restoreState必须在恢复时处理这样的列表。如果状态不可重新分区,
则始终可以在snapshotState()中返回Collections.singletonList(MY_STATE)

有状态源函数


与其他运算符相比,有状态的源需要更多的关注。为了使状态和输出集合的更新成为原子(在故障/恢复时exactly-once所需),
用户需要从源的上下文获取锁定。

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  current offset for exactly once semantics */
    private Long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}

当Flink完全确认检查点与外界通信时,某些运算符可能需要这些信息,在这种情况下,
请参阅org.apache.flink.runtime.state.CheckpointListener接口

上一篇 下一篇

猜你喜欢

热点阅读