Hadoop

Flink状态

2020-02-02  本文已影响0人  单行线的旋律

key状态和算子状态

key状态

key状态总是与key有关,只能被用于keyedStream类型的函数与算子。你可以认为key状态是一种被分区的算子状态,每一个key有一个状态分区。每一个key状态逻辑上由<parellel-operator-instance, key>唯一确定,由于每一个key只分布在key算子的多个并发实例中的一个实例上,我们可以将<parellel-operator-instance, key>简化为<operator, key>.

算子(operator)状态

算子状态也称为非key状态。每一个算子状态绑定一个并发算子实例。kafka connector是flink算子状态比较好的应用范例。每一个 kafka consumer并发实例都维护一个topic分区和分区对应的offset的map,并将此map作为算子状态。

当并发数改变的时候,算子状态支持在并发实例间重新分配状态。有多种不同的重分配策略。

原始的和被管理的状态

key状态和算子状态以两种形式存在:被管理的和原始的。
被管理的状态由flink runtime管理,以一种数据结构表示,比如内部hash表或者RocksDB,例如: ValueState,ListState等等。flink运行时对状态进行编码,然后写入checkpoints.

原始状态是算子保存到它们自己定义的数据结构中的一种状态。当checkpoint发生的时候,flink仅仅将二进制写入到checkpoint中,它不知道状态的数据结构,仅仅能看见原始的二进制字节数据。

所有的stream数据流function可以使用被管理的状态,但是当实现算子接口的时候,仅仅能使用原始状态接口。推荐使用被管理的状态而不是原始状态,因为使用被管理状态,当并发度变化的时候,flink能够自动重新分配状态,而且也能够更好地管理内存。

注意: 如果你需要自定义被管理状态的序列化逻辑,为了确保特性兼容,请看相应的说明。Flink默认的序列化不需要特别的处理。

使用被管理的Key状态

被管理的Key状态接口能够处理不同类型的状态,包括现有所有输入元素的key。这意味着这类状态仅仅能被用于KeyedStream上。KeyedStream能够通过stream.keyBy(...)创建。

现在,我们首先看一下当前所有的不同类型状态接口,然后我们看一下如何在程序中使用。状态接口类型如下:

所有类型的state状态接口都有一个clear()方法,能够清除当有输入元素key的状态。

注意: FoldingStateFoldingStateDescriptor已经在flink1.4中废弃了,将来会完全移除。请用AggregatingStateAggregatingStateDescriptor代替。

我们要记住两件重要的事,第一件事是上面这些state接口类型仅仅用于与状态交互。状态不一定必须要保存到flink内部,也可以保存到硬盘或其它地方。第二件事是你获取到的状态的值依赖输入元素的key值,所以在同一个user function中如果两次输入流中的key值不一样的话,value也不一样。

为了获得一个状态处理类,你必须要创建一个StateDescriptor对象。它里面保存着状态的名字(后面我们会看到,你可以创建多个状态,他们必须有不同的名字,以便你可以根据名字获取状态),状态存储值的类型和一个用户自定义的function,例如一个ReduceFunction。根据你想要存储状态的类型不同,你可以创建ValueStateDescriptor,ListStateDescrptor,ReducingStateDescriptor,FoldingStateDescriptorMapStateDescriptor对象。

状态可以通过RuntimeContext获取,它只能通过富函数(rich function)获取。请看这里详细了解。但是我们也看一个简短的例子。RichFunction中的RuntimeContext对象有如下方法可以获取状态。

下面举一个FlatMapFunction的例子说明所有部分如何配合的。

 public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long,Long>, Tuple2<Long,Long>> {
   
   private transient ValueState<Tuple2<Long,Long>> sum;
   
   @Override
   public void flatMap(Tuple2<Long,Long> input, Collector<Tuple2<Long,Long>> out) throws Exception {
     // 获取状态值
     Tuple2<Long,Long> currentSum = sum.value();
     
     // 更新数量
     currentSum.f0 += 1;
     
     // 将输入数据累加到第2个字段上
     currentSum.f1 += input.f1;
     
     // 更新状态
     sum.update(currentSum);
     
     // 如果数量达到2个,计算平均值,发送到下游,并清空状态
     if (currentSum.f0 >= 2) {
       out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
       sum.clear();
     }
   }
   
   @Override
   public void open(Configuration config) {
    ValueStateDescriptor<Tuple2<Long,Long>> descriptor = 
                                 new ValueStateDescriptor<>(
                                        "average", // 状态名称
                                        TypeInformation.of(new TypeHint<Tuple2<Long,Long>> () {}), //类型信息
                                        Tuple2.of(0L,0L)); // 状态默认值
         sum = getRuntimeContext().getState(descriptor);
   }
 }
 
 // 可以在流处理程序中像这样使用(假设我们有一个StreamExecutionEnvironment env)
 env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
   .keyBy(0)
   .flatMap(new CountWindowAverage())
   .print();
   
 // 将打印输出(1,4)和(1,5)

这个例子实现了一个贫血的计数窗口。我们以tuple的第一个字段做为key来分类(这个例子中所有key都为1)。CountWindowAverage类成员变量ValueState中存储着实时计算的数量和累加和。一量数量达到2个,它将计算平均值,发送到下游并清空状态,从0开始。需要注意的是,对于不同输入的key(输入元素Tuple的第一个元素值不同),ValueState将保存不同的值。

状态存活时间(TTL)

TTL可以被分配给任何类型的key状态。如果key状态设置了TTL,并且状态过期了,状态中存储的值将被清空。后面将详细说明。

所有状态集合的TTL是设置在每个元素上的。这意味着列表和map中每个元素元素过都是过期处理逻辑都是独立的,互不影响

为了使用TTL,首先要创建一个StateTtlConfig对象。通过给TTL函数传递这个状态配置对象激活TTL。

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

配置状态需要考虑以下几个方面:
newBuilder方法的第一个参数是必须的,它是TTL过期时间。
状态的TTL时间戳需要被刷新,还需要配置更新类型,表示在什么情况下刷新,默认是OnCreateAndWrite:

如果设置成NeverReturnExpired,过期的状态数据即使没有被清除也获取不到,就好像不存在一样。这个选项在数据过期后必须不可用的情况下是有用的,例如对隐私数据敏感的应用。

另一个选项是ReturnExpiredIfNotCleanedUp,如果过期的状态值没有被清除的话,允许返回。

注意:

过期状态数据的清除

默认情况下,过期的状态数据仅当显示的读取的时候才会被清除,例如调用ValueState.value()的时间。

注意: 这意味着如果过期状态数据没有被读取,它将不会被清除,可能导致状态数据的不断增长。在后来的flink版本中可能会改变。

在完全快照时清除

另外,你可以在执行状态完全快照时清除过期的状态值,这将减小快照的大小。在当前flink实现中,本地状态不会被清除,但是当从上一个快照中恢复的时候,不会包括过期的状态。可以在StateTtlConfig中配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

这个选项不适合于以RocksDB存储状态数据的递增checkpoint.

注意:

状态存储后端清除

除了在完全快照中清除,你还可以在后端清除。如果状态后端存储支持,下面选项可以激活默认的后端清除策略。

import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
   .newBuilder(Time.seconds(1))
   .cleanupInBackground()
   .build();

对于更详细的控制某些特别的后端清除策略,你可以按照下面描述的单独配置。当前,堆状态存储依赖递增的清除,RocksDB则使用压缩过滤器。

递增的清除

另一个选项是触发状态的递增清除策略。触发可以是每个状态的获取或每条记录处理时的回调。如果某些状态激活了清除策略,后端的存储将持有一个全局的针对所有元素的惰性迭代器。每次递增清除策略被触发时,迭代器就向前进,会检查经过的元素,过期的状态数据将被清除。

这个特性可以在StateTtlConfig激活

import org.apache.flink.api.common.state.StateTtlConfig;
 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally()
    .build();

这个策略有两个参数。第一个参数是每次清除触发时检查的元素个数。如果启用,当获取每一个状态数据时都会触发。第二个参数配置每条记录处理时,是否也触发清除。如果你启用默认的后端清除策略,对于堆存储来说,这个策略将在每次状态数据获取时被触发并检查5个元素,并且每条记录被处理时不会触发清除。

注意:

在RocksDB压缩过程中清除

如果使用RocksDB存储状态,另一个清除策略是激活flink压缩过滤器。RocksDB周期性的执行异步压缩来合并状态更新和减小存储。flink压缩过滤器检查带TTL元素的时间戳,排除掉过期的状态数据。

这个特性默认没有启用。可以通过flink配置文件配置,将state.backend.rocksdb.ttl.compaction.filter.enabled设置为true,或者当为某个Job创建自定义RocksDB状态存储时调用RocksDBStateBackend::enableTtlCompactionFilter设置。这样设置了TTL的状态将会使用过滤器。

import org.apache.flink.api.common.state.StateTtlConfig

StateTtlConfig ttlConfig = StateTtlConfig
       .newBuilder(Time.seconds(1))
       .cleanupInRocksdbCompactFilter(1000)
       .build();

当处理一定数量的状态数据后,RocksDB压缩过滤器将查询当前时间戳,检查有没有过期。你可以改变它,传递一个自定义值给StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)方法. 更新时间戳越频繁,清除的速度越快,但却降低了压缩性能。因为需要调用JNI本地方法. 如果你启用默认的清除策略(这个策略适用于RocksDB状态存储),在每处理1000个元素后都查询一次当前的时间戳。

如果想为RocksDB过滤器开启本地方法的debug级别日志,可以为FlinkCompactionFilter设置日志级别为Debug.

log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:

Scala DataStream API中的状态

除了上面讲到的接口, Scala API对于KeyStream中使用ValueState存储单个状态值的map()或flatmap()函数有更简短的写法。user function从Option对象中得到ValueState当前状态值,并返回一个待更新的值。

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

使用被管理的算子状态

为了使用被管理的算子状态,必须实现通用的CheckpointedFunction接口,或者实现ListCheckpointed<T extends Serializable>接口。

CheckpointedFunction

CheckpointedFunction接口可以让我们保存不同数据结构的非key对象的状态。我们如果要使用,必须实现以下两个方法:

void snapshotState(FunctionSnapshotContext conetxt) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

每当checkpoint执行的时候都会调用snapshotState()方法,而另一个方法,initializeState(),则是用户自定义的功能初始化的时候调用,包括首次初始化或者从之前的checkpoint恢复的时候。鉴于此,initializeState()不仅包含不同的需要保存状态的数据初始化的逻辑,还需要包含恢复状态的逻辑。

当前,支持列表类型的被管理的算子状态,状态应该是由互相独立的可序列化的对象组成的列表,当伸缩的时候需要重新分配。换言之,这些对象是非key状态分配的最小粒度。根据状态获取方式的不同,定义了以下分配策略:

下面是一个带状态的SinkFunction示例,使用平均分配策略,功能是在将一些元素发送给外部系统之前,使用CheckpointedFunction缓存这些元素.

 public class BufferingSink 
           implements SinkFunction <Tuple2<String,Integer>>, CheckpointedFunction {
        
        private final int threshold;
        private transient ListState<Tuple2<String, Integer>> checkpoinedState;
        private List<Tuple2<String, Integer>> bufferedElements;
        public BufferingSink(int threshold) {
         this.threshold = threshold;
         this.bufferedElements = new ArrayList<>();
        }
        
        @Override
        public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
         bufferedElement.add(value);
         if (bufferedElements.size() == threshold) {
           for (Tuple2<String, Integer> element : bufferedElements) {
               // send it to the sink
           }
           bufferedElement.clear();
         }
        }
        
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
          checkpointedState.clear();
          for (Tuple<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
          }
        }
        
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
          ListStateDescriptor<Tuple2<String, Integer>> descriptor = 
              new ListStateDescriptor<>(
                  "buffered-elements",
                  TypeInformation.of(new TypeHint<Tuple2<String, Integer>>(){}));
                  
         checkpointedState = context.getOperatorStateStore().getListState(descriptor);
         
         if(context.isRestored()) {
             for (Tuple2<String, Integer> element : checkpointedStage.get()) {
                bufferedElements.add(element);
             }
          }
     }
 }

initializeState方法接收一个FunctionInitializationContext参数。这个参数用于初始化非key的状态"容器"。有一个ListState类型的状态容器,当checkpointing发生的时候,非key的状态会存储在ListState对象中。

注意一下状态容器是如何初始化的,和key状态相似,都需要一个StateDescriptor来定义状态名和保存状态的数据类型信息。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

获取状态方法名称的不同代表了不同的分配策略。例如,如果想在恢复的时候使用合并分配策略,获取状态时,使用getUnionListState(descriptor)方法。如果方法名不包含分配策略名称,例如:getListState(descriptor),就默认表示将会使用平均分配策略。

在初始化状态容器之后,我们使用isRestored()方法来判断当前是否是失败后的恢复,如果是,将执行恢复逻辑。

我们再来看看类BufferingSink的代码,ListState是成员变量,在initializeState方法初始化,以便在snapshotState方法中使用。在snapshotState方法中,ListState首先清除上一次checkpont的所有对象,然后保存这一次需要checkpoint的对象。

顺便提一下,key状态也能使用initializeState方法初始化,可以使用FunctionInitializationContext对象实现。

ListCheckpointed

ListCheckpointedCheckpointedFunction的一个变体,有更多的限制条件,仅支持list类型的状态存储,并且只能是平均分配。包含以下两个方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;

snapshotState方法中,算子应该返回需要保存的list对象,当恢复的时候,在restoreState方法中编写恢复list数据的逻辑。 如果状态不需要重新分区,可以在snapshotState方法中返回Collections.singletonList(MY_STATE)对象。

Stateful Source函数

相对于其它算子,Stateful source算子有一点特别。为了使更新状态和输出状态原子化(失败/恢复的恰好一次主义要求),必须在source算子的上下文中使用锁。

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  恰好一次语义使用的偏移量 */
    private Long offset = 0L;

    /**job是否取消的标识*/
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // 输出和更新状态是原子化的
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}

当flink checkpoint完全确认的时候,一些算子也许需要与外部系统交换一些信息,这种情况看下org.apache.flink.runtime.state.CheckpointListener接口。

上一篇 下一篇

猜你喜欢

热点阅读