Storm中几种基本的Bolt接口的特点

2019-06-06  本文已影响0人  叫我不矜持

Storm中几种基本的Bolt接口的特点

Storm中定义的Bolt接口主要有IBolt 、 IRichBolt 、 IBasicBolt和IBatchBolt,先看一下类图

bolt类图

一.IRichBolt

Storm中最常用来定义Topology组件的接口。 它十分灵活, 用户可以通过其实现各种控制逻辑, 并且能控制何时进行Ack 、 Fail和Anchor操作。

Bolt是Storm中的基础运行单位, 当其启动并有消息输人时, 将调用execute方法来进行处理。与ISpout类似, IBolt对象在提交时也会被序列化为字节数组, 具体的执行节点通过反序列化的方法得到该对象, 并调用prepare回调方法。用户应将复杂对象的初始化放在prepare回调方法中实现, 以保证每个具体对象都可以正确初始化。对象被销毁时, 将调用cleanup回调方法, 但是Storm并不保证该方法一定被执行。

通常, 在execute方法的实现中会对输人消息进行处理, 这有可能产生新消息需要发送到下游节点, 最后还要对输入的消息进行Ack操作。 如果消息处理失败, 则需对输入的消息进行Fail操作, 这是保证Ack消息系统可以正常工作的基础。

二.IBasicBolt

Storm中提供的定义简单逻辑的Topology组件接口,用户基于它实现自己的Bolt也比较简单。

基于IBasicBolt编写的好处是Storm框架本身帮你处理了所发出消息的Ack 、 Fail和Anchor操作, 这是由执行器BasicBoltExecutor实现的。BasicBoltExecutor实现了IRichBolt接口, 同时还包含了一个IBasciBolt成员变量用于调用的转发。 它是基于装饰模式实现的。

其代码如下所示

public class BasicBoltExecutor implements IRichBolt {
    public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
    private IBasicBolt _bolt;
    private transient BasicOutputCollector _collector;

    public BasicBoltExecutor(IBasicBolt bolt) {
        //内部封装了真实的bolt
        this._bolt = bolt;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        this._bolt.declareOutputFields(declarer);
    }

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this._bolt.prepare(stormConf, context);
        this._collector = new BasicOutputCollector(collector);
    }

    public void execute(Tuple input) {
        //该execute方法都是通过调用真实的bolt来完成的
        this._collector.setContext(input);

        try {
            this._bolt.execute(input, this._collector);
            this._collector.getOutputter().ack(input);
        } catch (FailedException var3) {
            if (var3 instanceof ReportedFailedException) {
                this._collector.reportError(var3);
            }

            this._collector.getOutputter().fail(input);
        }

    }

    public void cleanup() {
        this._bolt.cleanup();
    }

    public Map<String, Object> getComponentConfiguration() {
        return this._bolt.getComponentConfiguration();
    }
}

但IBasicBolt的使用是有限制的, 基于收到的某条消息衍生出来的所有消息必须在一次execute中发送出去, 否则内置的Ack机制将不能保证Bolt的正常工作。 所以, 用户应该避免使用该类型的Bolt来做诸如聚集或者连接的操作。

三.IBatchBolt

它是Storm提供的用来处理批量数据的接口。 目前,它只用于事务Topology中, 它是Storm实现事务Topology的基础。 IBatchBolt在系统收到属于某Batch的第一条消息时被创建, 而在所有的消息都处理完成之后再被销毁。 Storm中采用反序列化对象的方式来弥补不断创建IBatchBolt对象所带来的负担。

具有finishBatch方法: 该方法仅当这批消息被处理完时才会被调用。 如果BatchBolt同时实现了ICoranitter的接口, finishBatch方法只有当该Batch之前的所有Batch均被成功处理后才被调用。 这保证了强序关系,同时也是Storm中事务Topology的实现基础。

上一篇下一篇

猜你喜欢

热点阅读