storm消息可靠性保证

2018-01-08  本文已影响0人  loinliao

参考文档:

Storm offers several different levels of guaranteed message processing, including best effort, at least once, and exactly once through Trident.

一、消息完全处理

定义

消息完全处理描述的是一个从Spout发出的tuple怎样算是处理成功。
关键字:tuple tree
只要在spout下游的bolt有一个失败(需要bolt主动通知Acker)就算失败。

如何得知一个从spout发出的tuple是否被完全处理

storm使用一个内置的Acker结点来跟踪一个tuple是否被完全处理成功。
成功会回调spoutack方法,失败会回调fail方法

二、如何实现At Most Once语义

Storm 的消息保证机制中,如果需要实现 At Most Once 语义,只需要满足下面任何一条即可:

三、如何实现At Least Once语义

如果需要实现 At Least Once 语义,则需要同时保证如下几条:

如何实现可靠的 Spout

实现可靠的 Spout 需要在 nextTuple 函数中发送消息时,调用带 msgID 的 emit 方法,然后实现失败消息的重传(fail 函数),参考如下示例:

/**
     * 想实现可靠的 Spout,需要实现如下两点
     * 1. 在 nextTuple 函数中调用 emit 函数时需要带一个     msgId,用来表示当前的消息(如果消息发送失败会用 msgId 作为参数回调 fail 函数)
     * 2. 自己实现 fail 函数,进行重发(注意,在 storm 中没有 msgId 和消息的对应关系,需要自己进行维护)
     */
public void nextTuple() {
    //设置 msgId 和 Value 一样,方便 fail 之后重发
    collector.emit(new Values(curNum + "", round +     ""), curNum + ":" + round);
}

@Override
public void fail(Object msgId) {//消息发送失败时的回调函数
    String tmp = (String)msgId;   //上面我们设置了 msgId 和消息相同,这里通过 msgId 解析出具体的消息
    String[] args = tmp.split(":");

    //消息进行重发
    collector.emit(new Values(args[0], args[1]), msgId);
}

如何实现可靠的 Bolt

Storm 提供两种不同类型的 Bolt,分别是 BaseRichBolt 和 BaseBasicBolt,都可以实现可靠性消息传递,不过 BaseRichBolt 需要自己做很多周边的事情(建立 anchor 树,以及手动 ACK/FAIL 通知 Acker),使用场景更广泛,而 BaseBasicBolt 则由 Storm 帮忙实现了很多周边的事情,实现起来方便简单,但是使用场景单一。如何用这两个 Bolt 实现(不)可靠的消息传递如下所示:

//BaseRichBolt 实现不可靠消息传递
public class SplitSentence extends BaseRichBolt {//不建立 anchor 树的例子
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            _collector.emit(new Values(word));  // 不建立 anchor 树
        }
        _collector.ack(tuple);          //手动 ack,如果不建立 anchor 树,是否 ack 是没有区别的,这句可以进行注释
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}

//BaseRichBolt 实现可靠的 Bolt
public class SplitSentence extends BaseRichBolt {//建立 anchor 树以及手动 ack 的例子
    OutputCollector _collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            _collector.emit(tuple, new Values(word));  // 建立 anchor 树
        }
        _collector.ack(tuple);          //手动 ack,如果想让 Spout 重发该 Tuple,则调用 _collector.fail(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}

下面的示例会可以建立 Multi-anchoring
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

//BaseBasicBolt 是吸纳可靠的消息传递
public class SplitSentence extends BaseBasicBolt {//自动建立 anchor,自动 ack
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }      
}

四、如何实现Exactly Once语义

todo: Trident

上一篇 下一篇

猜你喜欢

热点阅读