JStorm源码分析-3.提交Topology

2019-03-14  本文已影响0人  史圣杰

构建Topology并在本地测试后,我们就可以将工程打包为jar包,并通过jstorm的jar命令提交到集群。这个过程使用了thrift的远程调用,相关技术可以参照http://matt33.com/2016/04/07/thrift-learn/#Thrift%E4%B9%8BHello-World

1.提交命令

将工程打为jar包之后,可以通过下面的命令提交到jstorm集群中。在提交之前,我们需要先启动zookeeper,nimbus和supervisor。

jstorm jar target/sequence-split-merge-1.0.5-jar-with-dependencies.jar 
com.alipay.dw.jstorm.example.sequence.SequenceTopology sequence_test

2.程序分析

2.1 jstorm脚本

jstorm命令其实是一个python脚本,位于jstorm-server项目bin/jstorm.py中。这个脚本的主要工作是提供了一些子命令,最终会调用指定的jar包内的main方法。
在main方法中,会根据传入的参数查找对应的方法。例如,jstrom jar命令就是jar对应的jar方法。在这个脚本中包含了很多子命令,可以看到有如下多个子命令,用来控制jstorm集群的行为。

COMMANDS = {"jar": jar, "kill": kill, "nimbus": nimbus, "zktool": zktool,
            "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
            "remoteconfvalue": print_remoteconfvalue, "classpath": print_classpath,
            "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage}

jar方法的主要逻辑是拼装java命令并调用

def jar(jarfile, klass, *args):
    childopts = "-Dstorm.jar=" + jarfile + (" -Dstorm.root.logger=INFO,stdout -Dlog4j.configuration=File:%s/conf/aloha_log4j.properties"  %JSTORM_DIR)
    exec_storm_class(
        klass,
        jvmtype="-client -Xms256m -Xmx256m",
        extrajars=[jarfile, CONF_DIR, JSTORM_DIR + "/bin", LOG4J_CONF],
        args=args,
        childopts=childopts)

例如,本文jstorm jar的命令最终会执行如下java命令,我们需要到我们编写main方法中的StormSubmitter查看具体逻辑。

java -client -Xms256m -Xmx256m 
-Djstorm.home=/Users/shishengjie/software/jstorm-0.9.1 
-Dstorm.options= 
-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib 
-Dstorm.jar=target/sequence-split-merge-1.0.5-jar-with-dependencies.jar 
-Dstorm.root.logger=INFO,stdout 
-Dlog4j.configuration=File:/Users/shishengjie/software/jstorm-0.9.1/conf/aloha_log4j.properties 
-cp /.../....jar:target/sequence-split-merge-1.0.5-jar-with-dependencies.jar:/Users/shishengjie/.jstorm:/Users/shishengjie/software/jstorm-0.9.1/bin:/Users/shishengjie/software/jstorm-0.9.1/conf/jstorm.log4j.properties 
com.alipay.dw.jstorm.example.sequence.SequenceTopology 
"sequence_test"

2.2 StormSubmitter

StormSubmitter类定义在jstorm-client工程中,submitTopology方法用来向运行中的jstorm集群提交Topology。

public static void submitTopology(String name, Map stormConf,
            StormTopology topology, SubmitOptions opts)
            throws AlreadyAliveException, InvalidTopologyException,
            TopologyAssignException {}

提交时我们可以看到,需要指定拓扑名称和配置,StormTopology与SubmitOptions类的实例。前面几个参数我们都知道是如何设置的,SubmitOptions是thrift定义的一个对象,用来表示拓扑是否激活。

struct SubmitOptions {
  1: required TopologyInitialStatus initial_status;
}
enum TopologyInitialStatus {
    ACTIVE = 1,
    INACTIVE = 2
}

// java中使用
SubmitOptions submitOptions = new SubmitOptions(TopologyInitialStatus.ACTIVE);

提交拓扑的主要逻辑:

  1. 参数的处理,开发者设置的stormConf并不完整,因此需要进行处理。首先会校验stormConf格式是否正确,stormConf需要是一个Json序列化格式,然后从命令行读取storm.options属性的内容,覆盖stormConf;接下来分别读取defaults.yaml和从storm.conf.file指定的storm配置文件中读取配置(如果没有指定,则从classpath中查找storm.yaml文件,jstorm-server中有一个名为storm.yaml的文件,里面是默认配置)。读取完文件的配置后,再次读取命令行指定的配置,保存为conf,并将stormConf的内容覆盖conf,这样就完成了补充配置的工作,stormConf是用户指定配置的,conf是补充后的完整配置。最后,设置用户分组参数,将stormConf序列化为json字符串。
    2.使用NimbusClient的静态方法从conf中获取NimbusClient实例
    3.调用NimbusClient的getClusterInfo方法向服务器获取集群信息,校验指定的拓扑名称是否已经存在。
    4.使用NimbusClient将storm.jar指定的jar包上传到服务器,storm.jar是在jstorm.py中设置的。
    5.使用NimbusClient的submitTopology方法提交拓扑。

至此,jstorm完成了jar包和拓扑的提交。我们可以看到全部是通过NimbusClient来完成的。有client端就有server端,这就涉及到了jsorm的架构。storm命令的python脚本都是作为client端发送请求的,而使用jstorm nimbus启动的进程则是服务端,用来处理client请求。

2.3 Nimbus

jstorm的nimbus命令会启动一个服务器进程,其提供了很多thrift服务,用来管理集群中拓扑,任务,worker等状态。Nimbus也定义在strom.thrift中,声明了其提供的很多服务。NimbusClient是jstorm封装的客户端,用来向服务器发出请求。
下面是使用thrift定义的nimbus提供的服务。

service Nimbus {
// 拓扑相关
  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: TopologyAssignException tae);
  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3:TopologyAssignException tae);
  void killTopology(1: string name) throws (1: NotAliveException e);
  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
  void activate(1: string name) throws (1: NotAliveException e);
  void deactivate(1: string name) throws (1: NotAliveException e);
  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);

  // 上传文件
  string beginFileUpload();
  void uploadChunk(1: string location, 2: binary chunk);
  void finishFileUpload(1: string location);
  
// 下载文件
  string beginFileDownload(1: string file);
  //can stop downloading chunks when receive 0-length byte array back
  binary downloadChunk(1: string id);

  // 获取集群状态和拓扑状态
  string getNimbusConf();
  ClusterSummary getClusterInfo();
  TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
  SupervisorWorkers getSupervisorWorkers(1: string host) throws (1: NotAliveException e);
  string getTopologyConf(1: string id) throws (1: NotAliveException e);
  StormTopology getTopology(1: string id) throws (1: NotAliveException e);
  StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
}

NimbusClient的获取

NimbusClient继承了ThriftClient类,创建时传入了conf作为配置。主要的初始化方法在ThriftClient的构造方法中。

  1. 获取master地址:初始化时,首先会根据conf获取zookeeper的配置,然后创建Curator客户端,由于nimbus在启动后创建/jstorm/master节点并将master的host和ip写入节点,nimbus的client启动时会对该节点进行检查,之后会监听这个节点,发送变化时会flushClient。
  2. 创建客户端:主要的逻辑在flushClient中,会从节点中读取host和port,然后使用thrift的接口创建连接。
  3. 创建完成后,NimbusClient通过flush方法获取了与服务端通信的Nimbus.Client

2.4 NimbusServer

由于客户端所做的事情只是调用,具体的逻辑是在服务端实现的,因此有必要对服务端进行分析。我们在使用jstorm nimbus命令时,最终会调用com.alibaba.jstorm.daemon.nimbus.NimbusServer,里面有main方法,可以启动一个服务器进程。

public static void main(String[] args) throws Exception {
    // read configuration files
    @SuppressWarnings("rawtypes")
    Map config = Utils.readStormConfig();
    NimbusServer instance = new NimbusServer();
    INimbus iNimbus = new DefaultInimbus();
    instance.launchServer(config, iNimbus);
}

服务器的相关设计我们在下一篇文档中介绍,现在主要是对服务器处理提交拓扑任务的处理进行分析。按照thrift的定义,我们在initThrift中找到了处理远程调用的类是ServiceHandler
ServiceHandler的submitTopologyWithOpts方法中,提交topology。

ServiceHandler的主要逻辑包括:生成标准化的Topology实例并注册到zk上;将Topology需要的jar包、Topology实例和配置通过复制和序列化保存到本地路径上;创建Task并注册到zk上,task节点的内容为TaskInfo即任务对应的组件名称;包装一个TopologyEvent对象提交给TopologyAssign线程处理,由其完成任务的资源分配。

2.4 TopologyAssign

TopologyAssign线程的主要工作就是处理Topology的任务分配,主要逻辑在doTopologyAssignment方法中。流程是先根据TopologyAssignEvent生成Assignment,然后将Assignment备份到zk中。在分析之前,需要先对下面几个对象有所了解

TopologyAssignContext

在处理分配拓扑之前,我们已有的资源是Topology实例和配置信息。TopologyAssignContext的目的是将分配拓扑所需的数据都维护起来。TopologyAssignContext内部保存了拓扑分配的相关上下文:

DefaultTopologyAssignContext

DefaultTopologyAssignContext继承了TopologyAssignContext类,

DefaultTopologyScheduler
DefaultTopologyScheduler实现了IToplogyScheduler接口,IToplogyScheduler只有一个方法

Map<Integer, ResourceAssignment> assignTasks(TopologyAssignContext contex) 
            throws FailedAssignTopologyException;

也就是说DefaultTopologyScheduler的主要功能就是根据TopologyAssignContext为task分配资源,如指定所在的supervisor,并在Supervisor的资源池中为task分配所需的资源。assignTasks方法是处理拓扑的核心功能。

ResourceAssignment
每个task会被分配给一个ResourceAssignment实例,ResourceAssignment包含了task所在supervisorId和磁盘,cpu,内存这些资源所在的slot,以及端口port。

Assignment

每个Topology都需要被nimbus分配给supervisor执行,分配的元数据就保存在Assignment对象中。通过zk的节点/jstorm/assignments/{topologyid}传递给Supervisor,Assignment内部包括:

处理流程

mkAssignment的prepareTopologyAssign

prepareTopologyAssign的主要作用是收集jstorm集群当前的上下文数据,包括:拓扑名称、标准化后的Topology、Topology配置、Supervisor列表、task列表、已经存在的Assignment。

mkAssignment的IToplogyScheduler

收集完TopologyContext之后,就可以使用调度器来为任务分配资源了。IToplogyScheduler是默认的用来调度拓扑的类,在init方法中被创建,使用的是DefaultTopologyScheduler类。这个类的作用是为task分配指定的资源。

分配过程为:

  1. 检查分配类型是否为ASSIGN_TYPE_NEW、ASSIGN_TYPE_REBALANCE、ASSIGN_TYPE_MONITOR三者之一;
  2. 在TopologyAssignContext的基础上创建DefaultTopologyAssignContext,内部包含了分配task资源需要的weight配置
  3. 如果是REBALANCE类型的,需要先释放掉已有的资源。context中保存了所有的SupervisorInfo,zk中记录了原有的Assignment,内部包含了task使用资源的情况,这样就可以对SupervisorInfo内的资源池进行释放。
  4. 统计需要分配的task,保存在needAssignTasks中。ASSIGN_TYPE_NEW类型的时候所有都需要分配;ASSIGN_TYPE_REBALANCE 需要分配(所有task 减去 已经停止的supervisor的task);ASSIGN_TYPE_MONITOR需要分配所有dead的task。
  5. 由于当前zk中可能有Assignment,其对应的task需要保持不变,所以,将这些task查找出来保存在keepAssigns中,然后获取需要保持的任务对应的<supervisorid和port> - > task,即每个WorkerSlot上对应的task。
  6. 计算 需要分配的worker数量 = 所有的worker总数量 - 已停止的supervisor的worker数量 - 需要保持的worker数量
  7. registerPreAssignHandler方法计算需要分配的任务needAssignTasks中每个组件分配类型ComponentAssignType对应的task。由于task中记录了组件名称,每个组件的配置可能会不同。ComponentAssignType共有三种类型的分配方式:USER_DEFINE, USE_OLD, NORMAL。每种方式都可能会对应着我们需要分配的若干task。
  8. 有了ComponentAssignType之后,就可以为task分配资源了。使用USER_DEFINE时,使用UserDefinePreAssign需要从配置文件中读取需要分配的cpu,内存和磁盘的solt。使用NORMAL时,对应的NormalPreAssign类。先查找是否有用户自定义的分配方式,如果没有,从配置中获取task占用的slot数量。cpu.slots.per.task:每个task会使用的cup slot数量,memory.slots.per.task每个task会使用的内存slot数量,task.alloc.disk.slot每个task会使用的磁盘slot数量。有了slot数量,还需要注意task.on.differ.node是否要求task位于不同的node上。满足slot的Supervisor很多,这时候就会使用总权重或level来为task选择最佳的Supervisor,并在Supervisor的资源池中分配相应的资源,并封装为ResourceAssignment对象返回。
  9. 分配完成后,执行PostAssignTaskPort的postAssign方法,为task分配worker需要的port。

完成上述两个步骤之后,已经为Topology分配了所需的资源,包括Supervisor节点,worker,内存,磁盘,cpu的数量。

mkAssignment的后续操作

  1. updateGroupResource 如果使用了Group模式,就需要设置分组数据。
  2. 计算supervisor与host的映射:zk中记录了所有的supervisor,过滤出task需要的supervisorid与host的映射关系即可。
  3. 计算task的starttime:ASSIGN_TYPE_NEW类型为当前时间
  4. 按照Assignment的构造方法,封装为Assignment对象
  5. 将Assignment的数据写入zk的/jstorm/assignments/{topologyId}节点里面
  6. 更新task的心跳起始时间

setTopologyStatus

激活Topology:/jstorm/topology/{topologyId}读取并反序列化为StormBase对象的实例。如果不存在,就创建这个节点,并创建StormBase实例保存起来。如果StormBase存在,就更新状态。

backupAssignment

主要的操作是将Assignment保存在zk上面。由于上面的过程中已经生产了Assignment对象,遍历task并构造出组件与task的映射关系后,封装为AssignmentBak,保存到/jstorm/assignments_bak/{topologyName}节点的内容中。

上一篇下一篇

猜你喜欢

热点阅读