JStorm源码分析-5.Supervisor
2019-03-15 本文已影响0人
史圣杰
Supervisor用于执行拓扑中指定的组件任务,是实际响应nimubus分配,控制worker运行的程序。
1. 启动Supervisor
启动Supervisor时,只需要输入
jstorm supervisor
jstorm.py脚本会将supervisor子命令解析为如下命令:
java -server
-Djstorm.home=/Users/shishengjie/software/jstorm-0.9.1
-Dstorm.options=
-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib
-Xms256m -Xmx256m -XX:+UseConcMarkSweepGC
-XX:+UseCMSInitiatingOccupancyOnly
-XX:CMSInitiatingOccupancyFraction=70
-Dlogfile.name=supervisor.log
-Dlog4j.configuration=File:/Users/shishengjie/software/jstorm-0.9.1/conf/jstorm.log4j.properties
-cp /Users/3.2.7.Final.jar:/Users/shishengjie/software/jstorm-0.9.1/conf
com.alibaba.jstorm.daemon.supervisor.Supervisor
2. Supervisor的启动
2.1 主要流程
- 读取配置,依次从
defaults.yaml
、classpath下的storm.yaml
和命令行的storm.options
中读取storm的配置,并确保配置的模式为distributed。 - 清除在本地路径
/{storm-local-dir}/supervisor/tmp
中的内容,从nimbus下载topology的jar包、conf和topology序列化文件会临时放在这个文件夹内,下载完成后转移到/{storm-local-dir}/supervisor/stormdist/{topologyId}
中,具体程序在downloadDistributeStormCode中。启动时先清理一遍文件夹。 - 创建LocalState,这是一个kv数据库,内容都保存在磁盘上。每次使用put的时候,会创建一个新的版本号,并将map中的所有内容都写入新文件。get是从最新的版本号中读取map再获取key对应的value。
- 从LocalState中获取supervisorId,如果不存在,使用uuid生成一个新的,保存到LocalState。
- 创建心跳线程Heartbeat。Heartbeat实现了RunnableCallback接口,可以看做一个任务,将其放入AsyncLoopThread中,每过
supervisor.heartbeat.frequency.secs
默认60秒,调用一次update方法。update的目的是向nimubus汇报supervisor的状态,通过修改心跳时间和运行时间,将SupervisorInfo序列化后的内容保存到zk的节点/jstorm/supervisors/{supervisorid}
中,这是一个ephemeral节点,supervisor退出时会清除这个节点。 - EventManager是一种事件机制,内部包含一个执行线程runningThread,会从queue中不断获取event,然后执行,通过add方法将event加入到queue中。SyncProcessEvent是一个实际要执行的Event,主要作用是kill掉异常的worker,启动新worker。SyncSupervisorEvent是Supervisor的主要工作,会从zk上读取assignment,下载拓扑代码,然后找出需要自己处理的任务,让SyncProcessEvent去处理Worker。EventManagerPusher的作用是每隔
supervisor.monitor.frequency.secs
默认10秒或zk上的assignment发生变化时(由nimbus修改),执行一次syncSupervisorEvent。 - 最后,将这些线程都封装到SupervisorManger中,便于停止supervisor或其中的所有worker进程。
2.2 SyncSupervisorEvent
SyncSupervisorEvent的主要功能是读取zk上的assignment信息,找出自己需要执行的任务,下载topology代码,然后执行SyncProcessEvent去控制worker,执行流程如下:
- 从zk的节点
/jstorm/assignment
在读取待分配的任务列表assignments,并且为节点添加callback,当节点发生变化时,执行EventManagerZkPusher的run方法,会将当前的SyncSupervisorEvent加入到SyncSupervisorEvent的事件管理器syncSupEventManager,等待下次执行,这样做到目的是加入队列,不需要考虑同步问题。 - 从本地路径
{strom.local.dir}/supervisor/stormdist/
下读取已经下载代码的拓扑列表downloadedTopologyIds。 - 有了zk中的任务列表assignments,可以找出当前supervisor的各个worker对应的任务:遍历assignments,找到分配给当前节点的Assignment,由于Assignment内部保存了task列表和每个task包括port在内的资源,可以计算出每个port对应的task列表。计算的目的是因为每个port对应着一个worker进程,需要明确每个worker包含的task。最后,还需要校验一个port(即worker)只能运行一个Assignment(对应着topology)。最后将本地LocalAssignment写入localState中保存(启动worker时会读取并使用)。
- zk中的Assignment中记录了每个master上topology在的代码路径,downloadedTopologyIds的不需要下载,其他的需要下载到指定目录
{strom.local.dir}/supervisor/stormdist/topologyId
中。 - 第4步的时候,可以计算出当前的supervisor需要处理的topology,删除路径
{strom.local.dir}/supervisor/stormdist/
下面,不需要处理的topology文件夹 - 将syncProcesses加入到processEventManager中,也就是去执行一遍SyncProcessEvent。
2.3 SyncProcessEvent
SyncProcessEvent的主要功能是根据worker的心跳判断其状态,已经失效的需要kill掉,已经停止的需要重启,根据任务分配启动新的worker。主要流程如下:
- 从localState中读取LocalAssignment,SyncSupervisorEvent会从zk中的assignment中过滤出当前supervisor需要处理的task。
- 从本地读取worker和worker心跳判断worker的状态,状态记录在StateHeartbeat中,从
{storm.local.dir}/workers
下读取文件列表workerIds,{storm.local.dir}/workers/${woker-id}/heartbeats
里面是心跳信息WorkerHeartbeat。有了worker和心跳,就可以计算worker的状态了。如果WorkerHeartbeat为null,说明没有启动State.notStarted
;如果心跳状态与分配的任务不匹配,设置为State.disallowed
;如果心跳时间超时,设置为State.timedOut
;其余情况,状态为正常State.valid
; - 计算处理worker的状态之后,就需要根据状态对Worker进程进程处理了,首先会将非
State.valid
的Worker全部kill掉,心跳中记录了pid,使用kill命令即可。 - 遍历LocalAssignment中的Worker,将没有启动的Worker启动起来,使用uuid生成workerId,在本地创建文件夹
{storm.local.dir}/workers/{newworkid}/pids
,在launchWorker方法中通过拼接java命令的方式启动Worker进程。Worker的启动类在com.alibaba.jstorm.daemon.worker.Worker
中,向worker传递的参数有: topologyId 、supervisorId、port、workerId、workerClassPath:jar路径。