编写一个直接在Yarn上运行的程序
我们知道基于mapReduce框架的分布式程序的编写,在这种框架下我们不需要考虑申请资源,只需要安照mapreduce框架的要求,直接编写Map函数和reduce函数即可。如何在Yarn上直接编写应用程序呢?
要想在Yarn上编写应用程序,需要编写两个组件,Client和ApplicationMaster. 例如,JobClient和MRAppMaster是Yarn专门为Mapreduce设计实现的两个Client和ApplicationMaster组件。
客户端负责向ResourceManager提交ApplicationMaster,并查询应用程序的状态。
ApplicationManager负责向ResourceManager申请资源(返回以Container形式),并与NodeManager通信以启动各个Container,同时负责监控运行的状态,并在失败时候重新申请资源。
client设计
客户端负责向ResourceManager提交ApplicationMaster,并查询应用程序的状态。
-
客户端通过RPC与ResourceManager通信获取appId和可分配的资源等信息
-
客户端通过RPC将ApplicationMaster提交到ResourceManager
客户端将启动ApplicationMaster所需要的信息打包到ApplicationSubmisionContext中
主要包括:application_id,name,priority,queue,user,unmanagered_am等
也需要提提供ApplicationClient接口实现,以供返回信息,包括集群信息,节点信息,kill信息,运行状态
当然这些程序可以使用java的RPC进行编程,Yarn提供了YarnClient类的封装编程库,使用maven载入。
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-yarn-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>3.2.0</version>
</dependency>
public class YarnClientDemo {
//创建一个yarnclient客户断
private YarnClient client;
//这里配置Yarn集群信息或者采用加在yarn-site.xml文件形式
private Configuration conf;
public void initClient() throws IOException, YarnException {
client = YarnClient.createYarnClient();
client.init(conf);
//启动YarnClient
client.start();
//获取一个applicationID
YarnClientApplication app = client.createApplication();
//构造applicationsubmit用于提交
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
//获取appid
ApplicationId appId = appContext.getApplicationId();
//获取资源信息
Resource resource = appContext.getResource();
//设置app信息
appContext.setApplicationName("statistic app");
appContext.setQueue("background");
appContext.setUnmanagedAM(false);
client.submitApplication(appContext);
}
}
ApplicationMaster设计
ApplicationManager负责向ResourceManager申请资源(返回以Container形式),并与NodeManager通信以启动各个Container,同时负责监控运行的状态,并在失败时候重新申请资源。
所以我们分为AM-RM交互和AM-NM交互
AM-RM
- ApplicationMaster通过RPC向ResourceManager注册
ApplicationMaster启动时向ResourceManager注册,注册信息封装RegisterApplicationMasterRequest中,主要信息有host,
rp_port,tracking_url追踪URL.
注册成功后收到一个RegisterApplicationMasterResponse返回值,主要信息有,maximumCapability最大可用资源,client_to_am_token_master_key等信息
- ApplicationMaster通过RPC向ResourceManager申请资源
ApplicationMaster将要请求的资源封装为AllocateRequest参数,主要包括,priority,resource_name,capability,num_containers资源数目,进度,请求加入黑名单的机器。用户可以将一个机器加入黑名单使,RM不把该机器资源分配给本程序。
ApplicationMaster调用后会收到AllocationResponse类型返回信息。主要包括,reponse_id,allocated_containers,limit,状态等信息。
- ApplicationMaster通过RPC告诉ResourceManager程序运行完毕,退出
ApplicationMaster与ResourceManager交互由AMRMClientImpl和AMRMClientAsync实现,但是AMRMClientImpl是阻塞的,AMRMClientAsync是非阻塞的
public class MyCallByHandler implements AMRMClientAsync.CallbackHandler
{
//配置yarnconf信息
private YarnConfiguration conf;
public void onContainersCompleted(List<ContainerStatus> list) {
}
public void onContainersAllocated(List<Container> list) {
//构建句柄
AMRMClientAsync.CallbackHandler callbackHandler = new MyCallByHandler();
AMRMClientAsync<ContainerRequest> asyncClient = AMRMClientAsync.createAMRMClientAsync(1000, callbackHandler);
asyncClient.init(conf);
asyncClient.start();
try {
//三个需要填的参数是name,port,url
RegisterApplicationMasterResponse response = asyncClient.registerApplicationMaster("statistic app",5200,"url");
asyncClient.addContainerRequest(ContainerRequest.newBuilder().build());
} catch (YarnException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
public void onShutdownRequest() {
}
public void onNodesUpdated(List<NodeReport> list) {
}
public float getProgress() {
return 0;
}
public void onError(Throwable throwable) {
}
}
上简单实现构建的代表
AM-NM
- ApplicationMaster将申请到的资源二次分配给内部的任务,并通过RPC与NodeManager通信,启动Container。
该过程传参为StartContainerRequest,主要包括localResources,environment,container_token等信息,返回一个StartContainerResponse,主要包括services_meta_data,成功或失败请求值
-
ApplicationMaster向NodeManager询问container的运行状态,失败会重新申请资源
-
Container运行完成,ApplicationMaster通过RPC释放Container
ApplicationMaster与ResourceManager交互由NMClientImpl和NMClientAsync实现,但是NMClientImpl是阻塞的,NMClientAsync是非阻塞的
实现方法差不多,实现NMClientAsync接口。
Yarn实现了DistributionShell的实例
DistributionShell 是Yarn自带的Application实现的例子,可以运行shell命令,代码也不多
1)构造RPC句柄。
利用Hadoop RPC接口创建一个可以直接与ResourceManager交互的RPC client句柄applicationsManager:
private void connectToASM() throws IOException {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
InetSocketAddress rmAddress = yarnConf.getSocketAddr(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
LOG.info(“Connecting to ResourceManager at ” + rmAddress);
applicationsManager = ((ClientRMProtocol) rpc.getProxy(
ClientRMProtocol.class, rmAddress, conf));
}
(2)获取application id。
与ResourceManager通信,请求application id:
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse response = applicationsManager.getNewApplication(request);
(3)构造ContainerLaunchContext。
构造一个用于运行ApplicationMaster的container,container相关信息被封装到ContainerLaunchContext对象中:
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
//添加本地资源
//填充localResources
amContainer.setLocalResources(localResources);
//添加运行ApplicationMaster所需的环境变量
Map<String, String> env = new HashMap<String, String>();
//填充env
amContainer.setEnvironment(env);
//添加启动ApplicationMaster的命令
//填充commands;
amContainer.setCommands(commands);
//设置ApplicationMaster所需的资源
amContainer.setResource(capability);
(4)构造ApplicationSubmissionContext。
构造一个用于提交ApplicationMaster的ApplicationSubmissionContext:
ApplicationSubmissionContext appContext =
Records.newRecord(ApplicationSubmissionContext.class);
//设置application id,调用GetNewApplicationResponse#getApplicationId()
appContext.setApplicationId(appId);
//设置Application名称:“DistributedShell”
appContext.setApplicationName(appName);
//设置前面创建的container
appContext.setAMContainerSpec(amContainer);
//设置application的优先级,默认是0
pri.setPriority(amPriority);
//设置application的所在队列,默认是”"
appContext.setQueue(amQueue);
//设置application的所属用户,默认是”"
appContext.setUser(amUser);
(5)提交ApplicationMaster。
将ApplicationMaster提交到ResourceManager上,从而完成作业提交功能:
applicationsManager.submitApplication(appRequest);
(6) 显示应用程序运行状态。
为了让用户知道应用程序进度,Client会每隔几秒在shell终端上打印一次应用程序运行状态:
while (true) {
Thread.sleep(1000);
GetApplicationReportRequest reportRequest =
Records.newRecord(GetApplicationReportRequest.class);
reportRequest.setApplicationId(appId);
GetApplicationReportResponse reportResponse =
applicationsManager.getApplicationReport(reportRequest);
ApplicationReport report = reportResponse.getApplicationReport();
//打印report内容
…
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
if (YarnApplicationState.FINISHED == state) {
if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
return true;
} else {
return false;
}
} else if (YarnApplicationState.KILLED == state
|| YarnApplicationState.FAILED == state) {
return false;
}
}