JStorm源码分析-2.组装Topology

2019-03-12  本文已影响0人  史圣杰

我们在使用jstorm的时候,主要的工作就是将我们的数据处理逻辑构造为Topology,再提交给jstorm集群运行。本文主要描述了jstorm如何将用户构造的Topology转为thrift定义的数据类型。

1.程序示例

jstorm为用户提供了通用接口,来构造自己的业务逻辑,分布式流式计算-jstorm部署中演示了如何构建一个统计单词的拓扑结构。我们可以看到构建一个Topology就是创建Spout和Bolt并设置相关的配置,然后使用group机制将这些组件拼装起来,最后生成一个StormTopology对象。

2.程序分析

根据jstorm的架构设计,有如下几个组件需要明确:

下面,我们自下而上观察Topology实际上是如何构建并保存到StormTopology中的。

2.1 IRichSpout和IRichBolt

IRichSpout和IRichBolt都是jstorm中的组件,继承自IComponent,IComponent是Topology中所有的组件的父类,通过java定义拓扑时会使用到。

IComponent

public interface IComponent extends Serializable {
    void declareOutputFields(OutputFieldsDeclarer declarer);
    Map<String, Object> getComponentConfiguration();
}

declareOutputFields方法定义了组件对不同stream的输出Field;getComponentConfiguration定义了组件的配置,只有topology.*的配置才会被覆盖。

ISpout
ISpout是为拓扑提供数据来源的接口,每个tuple都是通过spout发出给后续的处理组件的,Storm会跟踪其emit出的tuples形成的DAG,当jstorm检测到tuple被所有组件处理后,会调用spout的ack方法。

public interface ISpout extends Serializable {
  void open(Map stormConf, TopologyContext context, SpoutOutputCollector collector)
  void close();
  void activate();
  void deactivate();
  void nextTuple();
  void ack(Object msgId);
  void fail(Object msgId);
}

Spout组件,会被封装为task交给worker进行初始化,在初始化的时候,会调用open方法,该方法提供了spout的运行时的上下文,即stormConf,TopologyContext和SpoutOutputCollector。
当worker进程被关闭时,会调用close方法,但是由于supervisor会使用kill -9的方式关闭worker进程,因此这个方法不一定会被调用到。
当一个spout被激活的时候,会调用activate方法,激活之后,会立刻调用nextTuple
当一个spout被置为无效的时候,会调用deactivate方法,之后不再调用nextTuple
nextTuple方法被调用时,jstorm希望Spout能够通过SpoutOutputCollector的emit方法将tuple发送出去,这个方法应该是非阻塞的,当没有tuple需要发送时,这个方法应该马上return,nextTuple,ack和fail这三个方法运行在同一个线程,如果nextTuple被阻塞了,会影响调用其他两个方法。当nextTuple没有发送消息时,最好能够休眠一会,以防止浪费太多的cpu时间。
在jstorm检测到spout发出的tuple被所有组件都处理后,会调用ack方法,当处理过程中发生异常时,会调用fail方法。

IRichSpout
IRichSpout继承了ISpout和IComponent接口,也就是定义Spout所应该具有的功能。

public interface IRichSpout extends ISpout, IComponent {
}

IBolt
IBolt定义了的Bolt的基本行为,其能够接收tuple作为输入,并产生tuple作为输出。IBolt中可以做很多业务处理,如过滤,聚合等处理。IBolt不需要对收到的tuple立刻进行处理,可以持有一段时间后稍后处理。
Bolt的生命周期过程为:被client创建 -> IBolt被序列化保存到topology中 -> 提交到nimbus的master -> nimbus启动worker进行,内部反序列化IBolt并调用prepare方法 -> 开始处理tuple。
如果你想要参数化一个IBolt,应该通过构造方法设置参数,并把参数状态保存到一个常量中。当使用java的api定义Bolt时,应该使用IRichBolt接口。

public interface IBolt extends Serializable {
  void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector);
  void execute(Tuple input);
  void cleanup();
}

当一个Bolt被封装为Task执行时,worker进程会调用prepare方法初始化,这个方法提供了Bolt运行的环境,如stormConf,context和collector对象。
收到tuple之后,会调用execute方法,Tuple对象中包含了很多元数据信息,如来源stream、来源组件id、源task等;Tuple内部保存的值可以通过getValue方法获得;处理完tuple之后,可以通过prepare方法中的OutputCollector来发送数据到下一个处理组件,这是因为storm的spout在的ack和fail机制是通过OutputCollector完成的。
当IBolt要关闭时,会调用cleanup方法,但是由于supervisor会使用kill -9的方式关闭worker进程,因此这个方法不一定会被调用到。

IRichBolt
IRichBolt的定义很简单,继承了上面的两个接口,也就是说一个Bolt作为一个组件加入到拓扑中,可以定义发送数据的格式和设置组件配置信息,作为一个处理单元,可以设置处理数据的业务逻辑。在使用java编写拓扑时,我们都是使用IRichBolt来作为最基础的Bolt接口的。

public interface IRichBolt extends IBolt, IComponent {
}

2.2 串联和分组

我们实现了自己的spout和bolt后,需要通过TopologyBuilder将这些组件串联起来。例如,下面的程序

SpoutDeclarer spout = builder.setSpout("spout",new SequenceSpout(), spoutParal);
builder.setBolt("split",new SplitBolt()).shuffleGrouping("spout");
builder.setBolt("count", new TotalCount(),boltParal).fieldsGrouping("split", new Fields("word"));

setSpout和setBolt会为每个组件设置一个key:spout是数据来源组件;split这个组件是语句拆分组件,通过shuffleGrouping方法连接到spout上;count组件用来为单词计数,由于相同单词需要发给同一个组件,所以使用fieldsGrouping方法指定了连接到split组件,并且根据tuple中的word字段来进行分组。

下面,来看看是如何实现串联和分组的。我们在定义Topology时,只是指定了行为,具体的行为需要jstrom运行时根据定义来执行。

TopologyBuilder

TopologyBuilder是jstorm提供的用来辅助开发者构造Topology结构的类,我们通过设置spout和bolt将指定组件串联后,最后可以生成一个保存了Topology定义的StormTopology类。

TopologyBuilder内部维护了四个map,用来保存不同的组件信息。

private Map<String, IRichBolt> _bolts = new HashMap();
private Map<String, IRichSpout> _spouts = new HashMap();
private Map<String, ComponentCommon> _commons = new HashMap();
private Map<String, StateSpoutSpec> _stateSpouts = new HashMap();

_bolts保存了通过setBolt方法定义的Bolt组件,_spouts保存了通过setSpout方法定义的Spout组件。_commons则保存了这个两个set方法时通用的组件信息。

setSpout
调用setSpout方法时,我们为spout指定了组件id和并发度。

public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
  this.validateUnusedId(id);
  this.initCommon(id, spout, parallelism_hint);
  this._spouts.put(id, spout);
  return new TopologyBuilder.SpoutGetter(id);
}

首先,会校验一下id是否已经被使用过,在_bolts、_spouts和_stateSpouts三个map中查找;
第二步,会将组件信息封装为ComponentCommon对象并保存到_commons中,ComponentCommon是thrift生成的对象,定义在storm.thrift中。
第三步,将id和实例保存在_spouts数组在。
最后,生成一个SpoutGetter对象返回,SpoutGetter用来设置组件的参数和分组情况。

ComponentCommon
ComponentCommon是thrift定义的对象,有四个属性。

struct ComponentCommon {
  1: required map<GlobalStreamId, Grouping> inputs; // 输入源
  2: required map<string, StreamInfo> streams; //key是stream的id,StreamInfo是stream的信息
  3: optional i32 parallelism_hint; //这个组件的并发度,也就是说会有几个线程运行这个组件
  4: optional string json_conf; // 组件的配置
}

// GlobalStreamId对象由组件id和streamid组成
struct GlobalStreamId {
  1: required string componentId;
  2: required string streamId;
}

// Grouping是一个union,只能有一个field被使用被赋值
union Grouping {
  1: list<string> fields; // global grouping
  2: NullStruct shuffle; // tuple被发送到随机的task(task是一个组件的运行任务)
  3: NullStruct all; // tuple被发送到每个task上
  4: NullStruct none; // tuple被发送到strom指定的task
  5: NullStruct direct; // bolt希望源bolt直接发过来
  6: JavaObject custom_object; // 自定义的分组类
  7: binary custom_serialized; // 序列化后的自定义分组类
  8: NullStruct local_or_shuffle; // 优先发送给同一个worker进程中的task,否则使用随机方式
}

// StreamInfo定义每个stream的输出格式
struct StreamInfo {
  1: required list<string> output_fields; // 输出的tuple的field列表
  2: required bool direct; 
}

通过上面的一系列定义,我们可以看出,一个组件包含的内容有四个部分:

initCommon方法中,创建了一个ComponentCommon对象,设置inputs为一个空的map,以及指定的并发度和组件配置(IComponent的getComponentConfiguration返回的配置),最后,将这个对象保存到了TopologyBuilder的_commons中。

SpoutGetter

setSpout的最后,会生成一个SpoutGetter类返回。SpoutGetter是一个定义在TopologyBuilder中的内部类,

ComponentConfigurationDeclarer
  -| BaseConfigurationDeclarer
    -| ConfigGetter
      -| SpoutGetter 【SpoutDeclarer接口】

ComponentConfigurationDeclarer接口用来为操作组件的配置,定义了添加修改配置的相关方法。BaseConfigurationDeclarer做了更上层的封装,子类只需要实现addConfigurations即可。ConfigGetter实现了addConfigurations方法,会根据组件id从_commons数组获取配置,与参数合并后写回到ComponentCommon的json_conf中。
SpoutGetter是ConfigGetter的子类,并未进行任何覆盖,因此,SpoutGetter的目的就是为了修改spout的配置。

setBolt

setBolt方法与setSpout的逻辑类似,不同的是最终返回的是BoltGetter类,BoltGetter集成了ConfigGetter,也就是说也具有了修改配置的功能。与SpoutGetter不同的是,BoltGetter内部实现了分组的多个方法,用来快速的设置分组类型。

private BoltDeclarer grouping(String componentId, String streamId,Grouping grouping) {
    // _boltId指的是当前bolt的id
   _commons.get(_boltId).put_to_inputs(new GlobalStreamId(componentId, streamId), grouping);
   return this;
}

主要逻辑为获取bolt组件的ComponentCommon类中的input,设置每个stream对应的Grouping类型。由于Grouping是一个union,每个类型都只设置自己对应的变量。

至此,我们已经了解到了TopologyBuilder内部是如何实现组件的串联和并分组了,接下来,就是如何获取StormTopology了。

2.3 StormTopology

构造完Topology之后,可以通过TopologyBuilder的createTopology方法获取StormTopology,StormTopology也是thrift定义的一个对象,这样便于将拓扑的结构发送给其他的系统,反序列化后即可获取拓扑结构。

在storm.thrift中,StormTopology的定义如下:

struct StormTopology {
  1: required map<string, SpoutSpec> spouts;  // 所有的Spout
  2: required map<string, Bolt> bolts;  // 所有的Bolt
  3: required map<string, StateSpoutSpec> state_spouts;
}

struct SpoutSpec {
  1: required ComponentObject spout_object;
  2: required ComponentCommon common;
}

struct Bolt {
  1: required ComponentObject bolt_object;
  2: required ComponentCommon common;
}
// ComponentObject是union,表示组件的三种类型
union ComponentObject {
  1: binary serialized_java;
  2: ShellComponent shell;
  3: JavaObject java_object;
}
struct ShellComponent {
  // should change this to 1: required list<string> execution_command;
  1: string execution_command;
  2: string script;
}
struct JavaObject {
  1: required string full_class_name;
  2: required list<JavaObjectArg> args_list;
}

可以看到,StormTopology内部包含了Spout和Bolt这两种组件的配置信息ComponentCommon,还包含了一个ComponentObject,这是一个union,如果组件是一个序列化的java实例,会以byte数组的形式保存;如果是一个Shell脚本,则包含执行的命令和脚本内容;如果是一个java类,会以JavaObject保存,内部包含了类的全名和参数列表。

创建拓扑

了解了StormTopology的内部结构之后,接下来就是如何通过TopologyBuilder来创建拓扑了。

public StormTopology createTopology() {
    Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
    Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
    for (String boltId : _bolts.keySet()) {
        IRichBolt bolt = _bolts.get(boltId);
        ComponentCommon common = getComponentCommon(boltId, bolt);
        boltSpecs.put(
                boltId,
                new Bolt(ComponentObject.serialized_java(Utils
                        .serialize(bolt)), common));
    }
    for (String spoutId : _spouts.keySet()) {
        IRichSpout spout = _spouts.get(spoutId);
        ComponentCommon common = getComponentCommon(spoutId, spout);
        spoutSpecs.put(
                spoutId,
                new SpoutSpec(ComponentObject.serialized_java(Utils
                        .serialize(spout)), common));

    }
    // spout的定义和bolt的定义
    return new StormTopology(spoutSpecs, boltSpecs,
            new HashMap<String, StateSpoutSpec>());
}

整个过程比较简单,由于TopologyBuilder内保存了spout和bolt的信息,StormTopology内部也维护了指定格式的Spout和Bolt类,只需要进行转换就可以了。遍历_bolts和_spouts数组,通过getComponentCommon获取ComponentCommon,将bolt和spout实例序列化后保存为指定对象。
getComponentCommon方法主要逻辑是对_commons中的ComponentCommon进行深拷贝,然后创建OutputFieldsGetter对象,交给我们定义在Spout和Bolt的declareOutputFields方法,设置各个stream对应的输出字段。有了这些信息后,维护在ComponentCommon的map<string, StreamInfo> streams中。
我们在使用setSpoutsetBolt的方法时,传入了Spout和Bolt的实例,将这些实例序列化后保存到ComponentObject中,然后就可以创建SpoutSpecBolt对象了。
最后,将这些对象保存到StormTopology中,完成了对拓扑结构的组装和转化。由于StormTopology中包含的信息都是由Thrift定义的,这些就可以提交给jstorm的服务器端运行了。

上一篇 下一篇

猜你喜欢

热点阅读