Flink-Streaming-State & Fault To

2019-05-02  本文已影响0人  耳边的火

上篇讲了operator state在恢复(重启/故障恢复)时要么使用evenly distributed策略,要么使用union策略,来重启operator的并发实例。
operator state支持的第三种类型是 Broadcast State。引入该类型state是为了支持一个流中的一些数据需要广播到所有流中的场景,这些数据会被存储在本地,并应用在另一些流的所有数据上以便进行处理。例如一个很自然的例子,我们有一个包含一系列规则的很缓慢的流,我们想要将这些规则应用到其他流的所有数据上。把这个例子记在脑子里,broadcast state与其他operator state不同点在于:

Provided APIs


在展示完整功能前,我们先从一个示例开始讲解flink提供的api。我们假设我们的数据流数据含有color与shape两个属性。我们想要找到一些特性模式的数据对,如:color属性相同,且shape符合某些规则,如先出现矩形再出现三角形。我们假设这些规则是随时间变化的。
在这个示例中,第一个流包含 Item类型的数据,他有 Color 与 Shape两个属性。另一个流包含Rule类型数据。
我们从包含Item的流入手,我们需要将其根据Color进行key操作,因为我们要找的数据对的color都是一样的。这样做之后,会保证具有相同color的数据会分配到相同的物理机器上。

// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
                        .keyBy(new KeySelector<Shape, Color>(){...});

再来看 Rule 数据流,包含Rule的数据流需要被广播broadcast到所有的下游任务中去,并且这些任务需要将其存储到本地,这样就可以使用本地数据与Item数据做计算
下面的小段程序会:1)广播rule数据流 2)使用提供的MapStateDescriptor 来创建rule需要存储到的broadcast state对象。

// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));
        
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);

最后,为了在Item流上的每个Item中都应用Rule进行计算,我们需要:
1.连接这两个流
2.定义我们模式匹配的逻辑

可以使用 connect() 方法来连接一个(keyed/non-keyed)流与 Broadcast 流。在非 broadcast流上调用connect() 方法,broadcast流作为参数传入。这个方法会返回 BroadcastConnectedStream 类,在这个类上,我们可以调用 process() 方法传入 CoProcessFunction的实现类。我们可以在这个函数内实现我们的逻辑。函数的类型取决于non-broadcast 流的情况:

假设,我们的non-broadcast流是 non-keyed 类型。
注意:connect方法需要由 non-broadcast 流来调用,broadcast流作为参数

DataStream<String> output = colorPartitionedStream
                 .connect(ruleBroadcastStream)
                 .process(
                     
                     // type arguments in our KeyedBroadcastProcessFunction represent: 
                     //   1. the key of the keyed stream
                     //   2. the type of elements in the non-broadcast side
                     //   3. the type of elements in the broadcast side
                     //   4. the type of the result, here a string
                     
                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                         // my matching logic
                     }
                 );
BroadcastProcessFunction 与 KeyedBroadcastProcessFunction

在 CoProcessFunction 接口中,有两个处理方法需要实现:processBroadcastElement() 方法,处理broadcast 流中的数据;processElement() 方法处理 non-broadcast流的数据。两个方法的详细方法签名如下:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

首先需要注意的是:两个接口都需要实现 processBroadcastElement() 方法来处理 broadcast流中的数据,以及 processElement() 方法来处理 non-broadcast流中的数据。
两个方法的区别在于他们入参的context 的不同。处理non-broadcast 的方法的入参为 ReadOnlyContext,处理 broadcast 的方法的入参为 Context。
两个context都有如下特点

getBroadcastState()方法中的 stateDescriptor 需要与 .broadcast(ruleStateDescriptor) 中的相同。
两个context的不同点在于两者对 broadcast state 的访问级别。 处理broadcast 流的方法的context对broadcast state 是读写权限,而处理 non-broadcast流的方法的context对broadcast state是只读权限。这样做的原因是:在Flink中,是没有跨任务数据交换(no cross-task communication)的。因此,为了保证在操作符的所有并发实例中的broadcast state都是相同的,我们仅给broadcast流读写权限,这样所有任务中的broadcast的数据都会相同,我们也能保证应用于non-broadcast数据上的计算在所有task中都是相同的。不这么做,就无法达到一致性的保证,导致不一致且很难调试的结果。
注意:processBroadcast() 方法中实现的逻辑也需要在所有并发操作实例中保持一致性。

最终,由于 KeyedBroadcastProcessFunction 是在 keyed 流上进行操作,它提供了 BroadcastProcessFunction 没有的一些功能:

  1. processElement()中的 ReadOnlyContext可以访问Flink提供的 timer server服务,它允许注册一个 event time/processing time的时间回调函数。当触发回调时,会调用onTime()方法,该方法的 OnTimeContext 参数包含了 ReadOnlyContext的所有功能,再加上:

2.processBroadcastElement()方法中的 Context 有 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function) 方法。该方法可以注册一个
KeyedStateFunction 应用于stateDescriptor所指代的state,所有key上的符合要求的state都会应用此functino。

注意:只可以在 KeyedBroadcastProcessFunction 中的 processElement() 中注册timer。不可以在 processBroadcastElement()中注册,因为这个方法中处理的是broadcast element,没有关联的key。

回到我们一开始的例子,我们的 KeyedBroadcastProcessFunction 可以这样写:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

    // store partial matches, i.e. first elements of the pair waiting for their second element
    // we keep a list as we may have many first elements waiting
    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
        new MapStateDescriptor<>(
            "items",
            BasicTypeInfo.STRING_TYPE_INFO,
            new ListTypeInfo<>(Item.class));

    // identical to our ruleStateDescriptor above
    private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
        new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));

    @Override
    public void processBroadcastElement(Rule value,
                                        Context ctx,
                                        Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    }

    @Override
    public void processElement(Item value,
                               ReadOnlyContext ctx,
                               Collector<String> out) throws Exception {

        final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
        final Shape shape = value.getShape();
    
        for (Map.Entry<String, Rule> entry :
                ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
            final String ruleName = entry.getKey();
            final Rule rule = entry.getValue();
    
            List<Item> stored = state.get(ruleName);
            if (stored == null) {
                stored = new ArrayList<>();
            }
    
            if (shape == rule.second && !stored.isEmpty()) {
                for (Item i : stored) {
                    out.collect("MATCH: " + i + " - " + value);
                }
                stored.clear();
            }
    
            // there is no else{} to cover if rule.first == rule.second
            if (shape.equals(rule.first)) {
                stored.add(value);
            }
    
            if (stored.isEmpty()) {
                state.remove(ruleName);
            } else {
                state.put(ruleName, stored);
            }
        }
    }
}

重要考量


介绍完提供的API后,接下来的这部分用于提醒你,在使用broadcast state时,需要记住这些重要的事情:

上一篇 下一篇

猜你喜欢

热点阅读