Storm中几种基本的Bolt接口的特点
Storm中几种基本的Bolt接口的特点
Storm中定义的Bolt接口主要有IBolt 、 IRichBolt 、 IBasicBolt和IBatchBolt,先看一下类图
bolt类图一.IRichBolt
Storm中最常用来定义Topology组件的接口。 它十分灵活, 用户可以通过其实现各种控制逻辑, 并且能控制何时进行Ack 、 Fail和Anchor操作。
Bolt是Storm中的基础运行单位, 当其启动并有消息输人时, 将调用execute方法来进行处理。与ISpout类似, IBolt对象在提交时也会被序列化为字节数组, 具体的执行节点通过反序列化的方法得到该对象, 并调用prepare回调方法。用户应将复杂对象的初始化放在prepare回调方法中实现, 以保证每个具体对象都可以正确初始化。对象被销毁时, 将调用cleanup回调方法, 但是Storm并不保证该方法一定被执行。
通常, 在execute方法的实现中会对输人消息进行处理, 这有可能产生新消息需要发送到下游节点, 最后还要对输入的消息进行Ack操作。 如果消息处理失败, 则需对输入的消息进行Fail操作, 这是保证Ack消息系统可以正常工作的基础。
二.IBasicBolt
Storm中提供的定义简单逻辑的Topology组件接口,用户基于它实现自己的Bolt也比较简单。
基于IBasicBolt编写的好处是Storm框架本身帮你处理了所发出消息的Ack 、 Fail和Anchor操作, 这是由执行器BasicBoltExecutor实现的。BasicBoltExecutor实现了IRichBolt接口, 同时还包含了一个IBasciBolt成员变量用于调用的转发。 它是基于装饰模式实现的。
其代码如下所示
public class BasicBoltExecutor implements IRichBolt {
public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
private IBasicBolt _bolt;
private transient BasicOutputCollector _collector;
public BasicBoltExecutor(IBasicBolt bolt) {
//内部封装了真实的bolt
this._bolt = bolt;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
this._bolt.declareOutputFields(declarer);
}
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this._bolt.prepare(stormConf, context);
this._collector = new BasicOutputCollector(collector);
}
public void execute(Tuple input) {
//该execute方法都是通过调用真实的bolt来完成的
this._collector.setContext(input);
try {
this._bolt.execute(input, this._collector);
this._collector.getOutputter().ack(input);
} catch (FailedException var3) {
if (var3 instanceof ReportedFailedException) {
this._collector.reportError(var3);
}
this._collector.getOutputter().fail(input);
}
}
public void cleanup() {
this._bolt.cleanup();
}
public Map<String, Object> getComponentConfiguration() {
return this._bolt.getComponentConfiguration();
}
}
但IBasicBolt的使用是有限制的, 基于收到的某条消息衍生出来的所有消息必须在一次execute中发送出去, 否则内置的Ack机制将不能保证Bolt的正常工作。 所以, 用户应该避免使用该类型的Bolt来做诸如聚集或者连接的操作。
三.IBatchBolt
它是Storm提供的用来处理批量数据的接口。 目前,它只用于事务Topology中, 它是Storm实现事务Topology的基础。 IBatchBolt在系统收到属于某Batch的第一条消息时被创建, 而在所有的消息都处理完成之后再被销毁。 Storm中采用反序列化对象的方式来弥补不断创建IBatchBolt对象所带来的负担。
具有finishBatch方法: 该方法仅当这批消息被处理完时才会被调用。 如果BatchBolt同时实现了ICoranitter的接口, finishBatch方法只有当该Batch之前的所有Batch均被成功处理后才被调用。 这保证了强序关系,同时也是Storm中事务Topology的实现基础。