distributedshell yarn编程指南
hadoop yarn是一个独立的调度框架,我们自己也可以通用yarn提供的api编写程序将我们自己的写的程序用yarn来调度运行
hadoop 官提供了一个distributedshell yarn程序,该程序用途是实现使用yarn调度框架执行shell,Hadoop提供这个程序的目的是为了通过这个简单的例子说明如果自己写一个yarn程序
具体代码地址:
https://github.com/apache/hadoop/tree/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell
编写yarn程序一般需要
- 编写一个客户端,客户端定义了启动ApplicationMaster的方式,提交application到RM
- 编写自己的ApplicationMaster,在ApplicationMaster中创建与RM,NN交互的客户端用于向RM申请资源并且在NN中启动容器运行任务
接口
底层接口
ApplicationClientProtocol
clients
和ResourceManager
之间用于提交/中断job
和获取 application
,集群metrics
,node
queue
和ACLs
信息的底层协议
ApplicationMasterProtocol
AM和RM通信的底层协议,ApplicationMasterProtocol.allocate
用于AM和RM心跳
ContainerManagementProtocol
AM和NN通信的底层协议,用于启动和停止容器以及获取运行容器的状态信息
高层接口
Client<-->ResourceManager
使用 YarnClient
对象
ApplicationMaster<-->ResourceManager
用于向RM申请Container
,使用AMRMClientAsync
对象, AMRMClientAsync.CallbackHandler
用于异步事件处理
ApplicationMaster<-->NodeManager
用于在NodeManager上启动容器,使用NMClientAsync
与NodeManager
通信,NMClientAsync.CallbackHandler
用于异步事件处理
distributedshell
下面我们来看一下hadoop的这个例子程序distributedshell是怎么编写的
编写Client
org.apache.hadoop.yarn.applications.distributedshell.Client
这个是客户端入口程序用于和RM交互
- 程序首先初始化了
YarnClient
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
然后调用yarnClient.createApplication
方法创建App,获取application id,底层api使用的是ApplicationClientProtocol
// Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
根据请求RM
获取到的application id
构造ApplicationSubmissionContext
,ApplicationSubmissionContext
代表了RM启动ApplicationMaster
所需要的信息,客户端需要在这个上下文中设置如下信息:
- Application 信息: id, name
- Queue, priority 信息:提交application到哪个队列, 优先级.
- User: 提交application的用户
-
ContainerLaunchContext
: AM将要运行的容器的所有信息的定义,比如Local Resources (binaries, jars, files etc.), Environment settings (CLASSPATH etc.), 需要执行的Command 和 security Tokens (RECT).
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
...
//设置appName
appContext.setApplicationName(appName);
...
//准备Local Resources ,Environment ,和Command
...
// 通过准备的信息构造 application master的 ContainerLaunchContext
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( localResources, env, commands, null, null, null);
...
//设置启动资源信息
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
//设置ContainerLaunchContext
appContext.setAMContainerSpec(amContainer);
//设置优先级
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);
//设置队列
appContext.setQueue(amQueue);
//提交应用
yarnClient.submitApplication(appContext);
举个例子帮助理解
假如以test用户运行hadoop-yarn-applications-distributedshell
, 运行命令如下:
hadoop jar hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar -jar hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar -queue root.download -shell_script /tmp/a.sh
假设yarnClient.createApplication()
- 申请到的appid为
application_1576067711791_1132781
, - Client会将 -jar参数 传过来的jar路径即
hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar
作为为本地资源(Local Resources),放到test用户的如下hdfs 路径
/user/test/DistributedShell/application_1576067711791_1132781/AppMaster.jar
- 对应的执行脚本
/tmp/a.sh
也会放到对于hadoop家目录对于application 的hdfs路径下:
/user/test/DistributedShell/application_1576067711791_1132781/ExecScript.sh
其中运行ApplicationMaster
的comond
为:
org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr
客户端与RM交互图
Client<->RMApplicationSubmissionContext结构图
image.png编写 ApplicationMaster (AM)
AM
使用ApplicationAttemptId
与RM交互
可以从传进来的env环境变量中获取 Container信息,进一步获取ApplicationAttemptId
ContainerId containerId = ConverterUtils.toContainerId(envs
.get(Environment.CONTAINER_ID.name()));
appAttemptID = containerId.getApplicationAttemptId();
在AM完全初始化自身之后,我们可以启动两个客户端:一个与RM通信,一个与NM通信。并设置相关的事件处理函数
//与RM通信并设置相关的事件处理函数
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
//与NM通信并设置相关的事件处理函数
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
AM必须向RM发出心跳信息
// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
AM向RM申请container资源
// Dump out information about cluster capability as seen by the
// resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
// A resource ask cannot exceed the max.
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
if (containerVirtualCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerVirtualCores + ", max="
+ maxVCores);
containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers =
response.getContainersFromPreviousAttempts();
LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
+ " previous attempts' running containers on AM registration.");
numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
int numTotalContainersToRequest =
numTotalContainers - previousAMRunningContainers.size();
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
// 申请启动container
for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
numRequestedContainers.set(numTotalContainers);
AMRMClientAsync.CallbackHandler
onContainersAllocated
回调函数启动LaunchContainerRunnable
线程执行
//containerListener是 containerListener = createNMCallbackHandler();
//containerListener是NMCallbackHandler类型的
LaunchContainerRunnable runnableLaunchContainer =
new LaunchContainerRunnable(allocatedContainer, containerListener);
Thread launchThread = new Thread(runnableLaunchContainer);
LaunchContainerRunnable
LaunchContainerRunnable
的run
方法里面构造了需要在container
中运行shell
的ContainerLaunchContext
,并且绑定containerListener
回调函数,然后使用nmClientAsync
异步启动container
//封装用于启动shell脚本的ctx
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, shellEnv, commands, null, allTokens.duplicate(), null);
//注册NMCallbackHandler回调函数
containerListener.addContainer(container.getId(), container);
//使用nmClientAysnc异步启动container
nmClientAsync.startContainerAsync(container, ctx);