Xlearning Container

2018-05-12  本文已影响30人  raincoffee

Container相关逻辑

Heartbeat

心跳就是一个线程类。继承Thread。采用的协议为ApplicationContainerProtocol。总的来说就是发送心跳,收到response,并处理response。

核心逻辑:

1. 发送心跳,并获取到response。 heartbeatResponse = protocol.heartbeat(containerId, heartbeatRequest);
    while(true):
        发送心跳给AM,如果失败,重试次数小于heartbeatRetryMax,sleep一会,继续发送。
        超出heartbeatRetryMax,break;
    response:isXLearningTrainCompleted,interResultTimeStamp 表示:是否训练完成,当时的时间戳
2. 处理response。 heartbeatResponseHandle(heartbeatResponse);
    2.1 isXLearningTrainCompleted==true
        将Heartbeat#IsXLearningTrainCompleted 设置为true 结束。
    2.2 isXLearningTrainCompleted==false
        2.2.1 interResultTimeStamp != lastInnerModelTimeStamp
              启动一个线程interResultSavedThread 主要讲输出写到dfs上
        2.2.2 lastInnerModelTimeStamp != Long.MIN_VALUE  并且该时间差大于XLEARNING_INTERRESULT_UPLOAD_TIMEOUT 说明当前状态已经保存 设置setInnerModelSavedStatus(true);

XLearningContainer

核心逻辑:main方法

开始
|
container初始化
|
container启动------
|                 | 
|success          |fail
报告成功              报告失败  
|                 | 
|------------------
退出

备注:报告成功/失败 并退出时,heartbeatThread会设置完成时间ContainersFinishTime以及状态ContainerStatus

主要启动逻辑如下:

1. 准备输入文件分片 prepareInputFiles()
2. 创建本地输出目录 createLocalOutputDir()
3. 创建real worker 运行env;不同AppType 不同env
4. 创建Executing command
5. 启动worker execute commend Runtime.getRuntime().exec(command, env)
6. heartbeatThread 设置container开始时间,并将在下次心跳发送
7. 启动线程输出out日志
8. 启动线程输出err日志
9. heartbeatThread 设置container状态为 running,并在下次心跳发送
10. Start board process (暂时略过 和MPI没关系)
11. containerReporter线程创建 并启动 收集CPU信息
    11.1 如果不是MPI程序;containerReporter = new ContainerReporter(amClient, conf, containerId, this.xlearningCmdProcessId);
    11.2 如果是MPI程序; do nothing why
12. 上传输出文件uploadOutputFiles
    11.1 如果不是MPI程序;通过心跳来查询isXLearningTrainCompleted,如果完成了,worker就uploadOutputFiles
    11.2 如果是MPI程序:尝试在updateAppStatusRetry次数内请求amClient.isApplicationCompleted 来判断是否完成,如果在请求次数范围内,获取到回应,如果完成,就上传文件,否则sleep一会。重复操作。

针对上述步骤11,启动一个containerReporter线程来进行cpuMetrics 收集

具体过程如下所示:

线程开始
|
步骤1.收集cpuMetrics,执行 produceCpuMetrics(this.xlearningCmdProcessId);
|
步骤2.每间隔3s,通过ApplicationContainerProtocol向AM汇报cpuMetrics信息。
|
循环执行上述过程,直到XLearningContainer gg 斯密达

上述过程步骤1,的具体实现见ContainerReporter#produceCpuMetrics(String xlearningCmdProcessId);其整体思想是:

1. 通过xlearningCmdProcessId(假如是3890),进程id,执行cmd命令 `cat /proc/3890/stat` 获取进程信息,执行结果为:
`3890 (xfsconvertd/3) S 2 0 0 0 -1 2216722496 .....` 从中解析获取父进程 ppid=2,containerProcessId=ppid。 
备注:stat具体含义见博客具体含义见https://blog.csdn.net/zyboy2000/article/details/50456764
2. 此刻我们拿到containerProcessId,然后启动一个线程cpuMetricsThread调用一些内部类来时刻获取更新cpuMetrics信息。

上述过程步骤2,整体过程就是,在一个while循环内,不断的通过ApplicationContainerProtocol向AM汇报cpuMetrics信息。具体调用方法为protocol.reportCpuMetrics(containerId, new Gson().toJson(cpuMetrics)); 具体实现见类ApplicationContainerProtocol。

上一篇下一篇

猜你喜欢

热点阅读