JStorm源码分析-2.组装Topology
我们在使用jstorm的时候,主要的工作就是将我们的数据处理逻辑构造为Topology,再提交给jstorm集群运行。本文主要描述了jstorm如何将用户构造的Topology转为thrift定义的数据类型。
1.程序示例
jstorm为用户提供了通用接口,来构造自己的业务逻辑,分布式流式计算-jstorm部署中演示了如何构建一个统计单词的拓扑结构。我们可以看到构建一个Topology就是创建Spout和Bolt并设置相关的配置,然后使用group机制将这些组件拼装起来,最后生成一个StormTopology对象。
2.程序分析
根据jstorm的架构设计,有如下几个组件需要明确:
- Spout和Bolt:Spout用来对接数据来源,Bolt内部是我们制定的业务处理逻辑,程序中提供了
IRichSpout
和IRichBolt
接口,以及thrift生成的Bolt
对象。 - Grouping: 构建Topology结构的时候,需要将spout和bolt以及bolt之间串联起来,jstorm提供的若干个Grouping机制用来将这两类组件串联为Topology。
- Topology: 描述了一个处理过程,每个拓扑有自己的数据来源Spout和处理组件Bolt组成,在程序中,最后会封装为
StormTopology
对象,通过StormSubmitter可以提交到集群上运行。 - TopologyBuilder:这个对象用来辅助我们创建Topology。
下面,我们自下而上观察Topology实际上是如何构建并保存到StormTopology中的。
2.1 IRichSpout和IRichBolt
IRichSpout和IRichBolt都是jstorm中的组件,继承自IComponent,IComponent是Topology中所有的组件的父类,通过java定义拓扑时会使用到。
IComponent
public interface IComponent extends Serializable {
void declareOutputFields(OutputFieldsDeclarer declarer);
Map<String, Object> getComponentConfiguration();
}
declareOutputFields方法定义了组件对不同stream的输出Field;getComponentConfiguration定义了组件的配置,只有topology.*
的配置才会被覆盖。
ISpout
ISpout是为拓扑提供数据来源的接口,每个tuple都是通过spout发出给后续的处理组件的,Storm会跟踪其emit出的tuples形成的DAG,当jstorm检测到tuple被所有组件处理后,会调用spout的ack方法。
public interface ISpout extends Serializable {
void open(Map stormConf, TopologyContext context, SpoutOutputCollector collector)
void close();
void activate();
void deactivate();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
Spout组件,会被封装为task交给worker进行初始化,在初始化的时候,会调用open
方法,该方法提供了spout的运行时的上下文,即stormConf,TopologyContext和SpoutOutputCollector。
当worker进程被关闭时,会调用close
方法,但是由于supervisor会使用kill -9
的方式关闭worker进程,因此这个方法不一定会被调用到。
当一个spout被激活的时候,会调用activate
方法,激活之后,会立刻调用nextTuple
。
当一个spout被置为无效的时候,会调用deactivate
方法,之后不再调用nextTuple
。
当nextTuple
方法被调用时,jstorm希望Spout能够通过SpoutOutputCollector的emit方法将tuple发送出去,这个方法应该是非阻塞的,当没有tuple需要发送时,这个方法应该马上return,nextTuple,ack和fail这三个方法运行在同一个线程,如果nextTuple被阻塞了,会影响调用其他两个方法。当nextTuple没有发送消息时,最好能够休眠一会,以防止浪费太多的cpu时间。
在jstorm检测到spout发出的tuple被所有组件都处理后,会调用ack
方法,当处理过程中发生异常时,会调用fail
方法。
IRichSpout
IRichSpout继承了ISpout和IComponent接口,也就是定义Spout所应该具有的功能。
public interface IRichSpout extends ISpout, IComponent {
}
IBolt
IBolt定义了的Bolt的基本行为,其能够接收tuple作为输入,并产生tuple作为输出。IBolt中可以做很多业务处理,如过滤,聚合等处理。IBolt不需要对收到的tuple立刻进行处理,可以持有一段时间后稍后处理。
Bolt的生命周期过程为:被client创建 -> IBolt被序列化保存到topology中 -> 提交到nimbus的master -> nimbus启动worker进行,内部反序列化IBolt并调用prepare方法 -> 开始处理tuple。
如果你想要参数化一个IBolt,应该通过构造方法设置参数,并把参数状态保存到一个常量中。当使用java的api定义Bolt时,应该使用IRichBolt接口。
public interface IBolt extends Serializable {
void prepare(Map stormConf, TopologyContext context,
OutputCollector collector);
void execute(Tuple input);
void cleanup();
}
当一个Bolt被封装为Task执行时,worker进程会调用prepare
方法初始化,这个方法提供了Bolt运行的环境,如stormConf,context和collector对象。
收到tuple之后,会调用execute
方法,Tuple对象中包含了很多元数据信息,如来源stream、来源组件id、源task等;Tuple内部保存的值可以通过getValue
方法获得;处理完tuple之后,可以通过prepare
方法中的OutputCollector来发送数据到下一个处理组件,这是因为storm的spout在的ack和fail机制是通过OutputCollector完成的。
当IBolt要关闭时,会调用cleanup
方法,但是由于supervisor会使用kill -9
的方式关闭worker进程,因此这个方法不一定会被调用到。
IRichBolt
IRichBolt的定义很简单,继承了上面的两个接口,也就是说一个Bolt作为一个组件加入到拓扑中,可以定义发送数据的格式和设置组件配置信息,作为一个处理单元,可以设置处理数据的业务逻辑。在使用java编写拓扑时,我们都是使用IRichBolt来作为最基础的Bolt接口的。
public interface IRichBolt extends IBolt, IComponent {
}
2.2 串联和分组
我们实现了自己的spout和bolt后,需要通过TopologyBuilder将这些组件串联起来。例如,下面的程序
SpoutDeclarer spout = builder.setSpout("spout",new SequenceSpout(), spoutParal);
builder.setBolt("split",new SplitBolt()).shuffleGrouping("spout");
builder.setBolt("count", new TotalCount(),boltParal).fieldsGrouping("split", new Fields("word"));
setSpout和setBolt会为每个组件设置一个key:spout
是数据来源组件;split
这个组件是语句拆分组件,通过shuffleGrouping
方法连接到spout
上;count
组件用来为单词计数,由于相同单词需要发给同一个组件,所以使用fieldsGrouping
方法指定了连接到split
组件,并且根据tuple中的word字段来进行分组。
下面,来看看是如何实现串联和分组的。我们在定义Topology时,只是指定了行为,具体的行为需要jstrom运行时根据定义来执行。
TopologyBuilder
TopologyBuilder是jstorm提供的用来辅助开发者构造Topology结构的类,我们通过设置spout和bolt将指定组件串联后,最后可以生成一个保存了Topology定义的StormTopology类。
TopologyBuilder内部维护了四个map,用来保存不同的组件信息。
private Map<String, IRichBolt> _bolts = new HashMap();
private Map<String, IRichSpout> _spouts = new HashMap();
private Map<String, ComponentCommon> _commons = new HashMap();
private Map<String, StateSpoutSpec> _stateSpouts = new HashMap();
_bolts保存了通过setBolt
方法定义的Bolt组件,_spouts保存了通过setSpout
方法定义的Spout组件。_commons则保存了这个两个set方法时通用的组件信息。
setSpout
调用setSpout方法时,我们为spout指定了组件id和并发度。
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
this.validateUnusedId(id);
this.initCommon(id, spout, parallelism_hint);
this._spouts.put(id, spout);
return new TopologyBuilder.SpoutGetter(id);
}
首先,会校验一下id是否已经被使用过,在_bolts、_spouts和_stateSpouts三个map中查找;
第二步,会将组件信息封装为ComponentCommon
对象并保存到_commons中,ComponentCommon
是thrift生成的对象,定义在storm.thrift
中。
第三步,将id和实例保存在_spouts数组在。
最后,生成一个SpoutGetter对象返回,SpoutGetter用来设置组件的参数和分组情况。
ComponentCommon
ComponentCommon是thrift定义的对象,有四个属性。
struct ComponentCommon {
1: required map<GlobalStreamId, Grouping> inputs; // 输入源
2: required map<string, StreamInfo> streams; //key是stream的id,StreamInfo是stream的信息
3: optional i32 parallelism_hint; //这个组件的并发度,也就是说会有几个线程运行这个组件
4: optional string json_conf; // 组件的配置
}
// GlobalStreamId对象由组件id和streamid组成
struct GlobalStreamId {
1: required string componentId;
2: required string streamId;
}
// Grouping是一个union,只能有一个field被使用被赋值
union Grouping {
1: list<string> fields; // global grouping
2: NullStruct shuffle; // tuple被发送到随机的task(task是一个组件的运行任务)
3: NullStruct all; // tuple被发送到每个task上
4: NullStruct none; // tuple被发送到strom指定的task
5: NullStruct direct; // bolt希望源bolt直接发过来
6: JavaObject custom_object; // 自定义的分组类
7: binary custom_serialized; // 序列化后的自定义分组类
8: NullStruct local_or_shuffle; // 优先发送给同一个worker进程中的task,否则使用随机方式
}
// StreamInfo定义每个stream的输出格式
struct StreamInfo {
1: required list<string> output_fields; // 输出的tuple的field列表
2: required bool direct;
}
通过上面的一系列定义,我们可以看出,一个组件包含的内容有四个部分:
- 输入,每个stream的分组方式
- 输出,每个stream定义的输出fields
- 并发度,每个组件会被封装为task运行在worker进程中,即组件会在几个线程上运行
- 组件的配置,以json的方式保存为字符串
在initCommon
方法中,创建了一个ComponentCommon对象,设置inputs为一个空的map,以及指定的并发度和组件配置(IComponent的getComponentConfiguration返回的配置),最后,将这个对象保存到了TopologyBuilder的_commons中。
SpoutGetter
在setSpout
的最后,会生成一个SpoutGetter
类返回。SpoutGetter是一个定义在TopologyBuilder中的内部类,
ComponentConfigurationDeclarer
-| BaseConfigurationDeclarer
-| ConfigGetter
-| SpoutGetter 【SpoutDeclarer接口】
ComponentConfigurationDeclarer接口用来为操作组件的配置,定义了添加修改配置的相关方法。BaseConfigurationDeclarer做了更上层的封装,子类只需要实现addConfigurations即可。ConfigGetter实现了addConfigurations方法,会根据组件id从_commons数组获取配置,与参数合并后写回到ComponentCommon的json_conf中。
SpoutGetter是ConfigGetter的子类,并未进行任何覆盖,因此,SpoutGetter的目的就是为了修改spout的配置。
setBolt
setBolt方法与setSpout的逻辑类似,不同的是最终返回的是BoltGetter类,BoltGetter集成了ConfigGetter,也就是说也具有了修改配置的功能。与SpoutGetter不同的是,BoltGetter内部实现了分组的多个方法,用来快速的设置分组类型。
private BoltDeclarer grouping(String componentId, String streamId,Grouping grouping) {
// _boltId指的是当前bolt的id
_commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping);
return this;
}
主要逻辑为获取bolt组件的ComponentCommon类中的input,设置每个stream对应的Grouping类型。由于Grouping是一个union,每个类型都只设置自己对应的变量。
至此,我们已经了解到了TopologyBuilder内部是如何实现组件的串联和并分组了,接下来,就是如何获取StormTopology了。
2.3 StormTopology
构造完Topology之后,可以通过TopologyBuilder的createTopology方法获取StormTopology,StormTopology也是thrift定义的一个对象,这样便于将拓扑的结构发送给其他的系统,反序列化后即可获取拓扑结构。
在storm.thrift中,StormTopology的定义如下:
struct StormTopology {
1: required map<string, SpoutSpec> spouts; // 所有的Spout
2: required map<string, Bolt> bolts; // 所有的Bolt
3: required map<string, StateSpoutSpec> state_spouts;
}
struct SpoutSpec {
1: required ComponentObject spout_object;
2: required ComponentCommon common;
}
struct Bolt {
1: required ComponentObject bolt_object;
2: required ComponentCommon common;
}
// ComponentObject是union,表示组件的三种类型
union ComponentObject {
1: binary serialized_java;
2: ShellComponent shell;
3: JavaObject java_object;
}
struct ShellComponent {
// should change this to 1: required list<string> execution_command;
1: string execution_command;
2: string script;
}
struct JavaObject {
1: required string full_class_name;
2: required list<JavaObjectArg> args_list;
}
可以看到,StormTopology内部包含了Spout和Bolt这两种组件的配置信息ComponentCommon,还包含了一个ComponentObject,这是一个union,如果组件是一个序列化的java实例,会以byte数组的形式保存;如果是一个Shell脚本,则包含执行的命令和脚本内容;如果是一个java类,会以JavaObject保存,内部包含了类的全名和参数列表。
创建拓扑
了解了StormTopology的内部结构之后,接下来就是如何通过TopologyBuilder来创建拓扑了。
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
for (String boltId : _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
ComponentCommon common = getComponentCommon(boltId, bolt);
boltSpecs.put(
boltId,
new Bolt(ComponentObject.serialized_java(Utils
.serialize(bolt)), common));
}
for (String spoutId : _spouts.keySet()) {
IRichSpout spout = _spouts.get(spoutId);
ComponentCommon common = getComponentCommon(spoutId, spout);
spoutSpecs.put(
spoutId,
new SpoutSpec(ComponentObject.serialized_java(Utils
.serialize(spout)), common));
}
// spout的定义和bolt的定义
return new StormTopology(spoutSpecs, boltSpecs,
new HashMap<String, StateSpoutSpec>());
}
整个过程比较简单,由于TopologyBuilder内保存了spout和bolt的信息,StormTopology内部也维护了指定格式的Spout和Bolt类,只需要进行转换就可以了。遍历_bolts和_spouts数组,通过getComponentCommon获取ComponentCommon,将bolt和spout实例序列化后保存为指定对象。
getComponentCommon方法主要逻辑是对_commons中的ComponentCommon进行深拷贝,然后创建OutputFieldsGetter对象,交给我们定义在Spout和Bolt的declareOutputFields方法,设置各个stream对应的输出字段。有了这些信息后,维护在ComponentCommon的map<string, StreamInfo> streams
中。
我们在使用setSpout
和setBolt
的方法时,传入了Spout和Bolt的实例,将这些实例序列化后保存到ComponentObject中,然后就可以创建SpoutSpec
和Bolt
对象了。
最后,将这些对象保存到StormTopology中,完成了对拓扑结构的组装和转化。由于StormTopology中包含的信息都是由Thrift定义的,这些就可以提交给jstorm的服务器端运行了。