大数据基础组件

开发yarn应用程序

2019-06-02  本文已影响0人  tracy_668

前言

YARN是一个资源管理系统,负责集群资源的管理和分配。yarn就好比hadoop集群的操作系统,当用户向YARN中提交一个应用程序后,需要同操作系统做一些交互,这样才能运行在yarn上。yarn分两个阶段运行应用程序: 第一个阶段是启动ApplicationMaster; 第二个阶段是由ApplicationMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。如果想要将一个新的应用程序跑在yarn上,通常需要编写这两个组件:客户端和ApplicationMaster。这两个组件的编写非常复杂,尤其是ApplicationMaster,需要考虑rpc调用,容错等细节,往往由专业的开发人员编写这两个组件,然后提供给上层的应用程序用户使用。如果大量 应用程序可以抽象为一种通用的框架, 那么只需要实现一个客户端和一个ApplicationMaster,这样所有的应用程序直接用这两个组件即可,比如MapReduce是一种通用的计算框架,YARN已经内嵌了一个可以直接使用的客户端MRClientService和ApplicationMaster—MRAppMaster。对于无法用MapReduce计算框架抽象出来的应用程序,要想跑在yarn上,就必须用户自己编写这两个组件,客户端和ApplicationMaster,其中,客户端负责向ResourceManager提交ApplicationMaster,并查询应用程序运行状态,ApplicationMaster负责向ResourceManager申请资源(以Container形式表示),并与NodeManager通信以启动各个Container,此外,ApplicationMaster还负责监控各个任务运行状态,并在失败是为其重新申请资源。

开发Client启动AM

Client部分是用于将应用提交到YARN, 从而可以启动application master.客户端通常只需与ResourceManager交互,期间涉及到多个数据结构和一个RPC协议,具体如下:

image.png

客户端通过rpc协议ApplicationClientProtocal向ResourceManager(具体是ApplicationsManager、ASM)发送应用程序提交请求GetNewApplicationRequest,ResourceManager为其返回应答GetNewApplicationResponse,该数据结构中包含多种信息,包括ApplicationId、可资源使用上限和下限等。初始化并启动一个yarnClient:

YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
(1) application id
(2) application 名称
(3) application优先级
(4) application 所属队列
(5) application 启动用户名
(6) ApplicationMaster对应的Container信息ContainerLaunchContext,包括:启动ApplicationMaster所需各种文件资源、jar包、环境变量、启动命令、运行ApplicationMaster所需的资源(主要指内存)等。
// set the application name
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
  localResources, env, commands, null, null, null);
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);

客户端调用ClientRMProtocol#submitApplication(ApplicationSubmissionContext)将ApplicationMaster提交到ResourceManager上。ResourceManager收到请求后,会为ApplicationMaster寻找合适的节点,并在该节点上启动它。

LOG.info("Submitting application to ASM");
yarnClient.submitApplication(appContext);

客户端可通过多种方式查询应用程序的运行状态,其中一种是调用RPC函数ClientRMProtocol#getApplicationReport获取一个应用程序当前运行状况报告,该报告内容包括应用程序名称、所属用户、所在队列、ApplicationMaster所在节点、一些诊断信息、启动时间等。

// Get application report for the appId we are interested in
ApplicationReport report = yarnClient.getApplicationReport(appId);
LOG.info("Got application report from ASM for"
    + ", appId=" + appId.getId()
    + ", clientToAMToken=" + report.getClientToAMToken()
    + ", appDiagnostics=" + report.getDiagnostics()
    + ", appMasterHost=" + report.getHost()
    + ", appQueue=" + report.getQueue()
    + ", appMasterRpcPort=" + report.getRpcPort()
    + ", appStartTime=" + report.getStartTime()
    + ", yarnAppState=" + report.getYarnApplicationState().toString()
    + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
    + ", appTrackingUrl=" + report.getTrackingUrl()
    + ", appUser=" + report.getUser());
YarnApplicationState state = report.getYarnApplicationState();
FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();

开发ApplicationMaster

ApplicationMaster需要与ResoureManager和NodeManager交互,以申请资源和启动Container,期间涉及到多个数据结构和两个RPC协议。具体步骤如下:

1)Hostname:期望Container所在的节点,如果是*,表示可以为任意节点。
2)Resource capability:运行该任务所需的资源量,如(memory/disk/cpu)。
3)Priority:任务优先级。一个应用程序中的任务可能有多种优先级,ResourceManager会优先为高优先级的任务分配资源。
4)numContainers:符合以上条件的container数目。
1)Requested containers:所需的Container列表
2)Released containers:有些情况下,比如有些任务在某些节点上失败过,则ApplicationMaster不想再在这些节点上运行任务,此时可要求释放这些节点上的Container。
3)Progress update information:应用程序执行进度
4)ResponseId:RPC响应ID,每次调用RPC,该值会加1。

ResourceManager会为ApplicationMaster返回一个AllocateResponse对象,该对象中主要信息包含在AMResponse中:

1)reboot:ApplicationMaster是否需要重新初始化.当ResourceManager端出现不一致状态时,会要求对应的ApplicationMaster重新初始化。
2)Allocated Containers:新分配的container列表。
3)Completed Containers:已运行完成的container列表,该列表中包含运行成功和未成功的Container,ApplicationMaster可能需要重新运行那些未运行成功的Container。
1)ContainerId:Container id
2)Resource:该Container可使用的资源量(当前仅支持内存)
3)User:Container所属用户
4)Security tokens:安全令牌,只有持有该令牌才可启动container
5)LocalResource:运行Container所需的本地资源,比如jar包、二进制文件、其他外部文件等。
6)ServiceData:应用程序可能使用其他外部服务,这些服务相关的数据通过该参数指定。
7)Environment:启动container所需的环境变量
8)command:启动container的命令
使用ApplicationMasterProtocol.allocate向ResourceManager发送查询请求;
使用ContainerManagementProtocol查询指定的ContainerId对应的Container的状态;

distributedshell实例分析

Distributedshell Client的入口main函数如下:

public static void main(String[] args) {
  ...
   Client client = new Client();
   boolean doRun = client.init(args);
    if (!doRun) {
       System.exit(0);
      }
    result = client.run();
    ...
}

DistributedShell Client中最重要的是函数为run(),该函数实现过程如下:
利用Hadoop RPC接口创建一个可以直接与ResourceManager交互的RPC client句柄applicationsManager

private void connectToASM() throws IOException {
  YarnConfiguration yarnConf = new YarnConfiguration(conf);
  InetSocketAddress rmAddress = yarnConf.getSocketAddr(
        YarnConfiguration.RM_ADDRESS,
        YarnConfiguration.DEFAULT_RM_ADDRESS,
        YarnConfiguration.DEFAULT_RM_PORT);
  LOG.info("Connecting to ResourceManager at " + rmAddress);
  applicationsManager = ((ClientRMProtocol) rpc.getProxy(
        ClientRMProtocol.class, rmAddress, conf));
  } 
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);
GetNewApplicationResponse response = applicationsManager.getNewApplication(request);

ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
//添加本地资源
//填充localResources
amContainer.setLocalResources(localResources);
//添加运行ApplicationMaster所需的环境变量
Map<String, String> env = new HashMap<String, String>();
//填充env
amContainer.setEnvironment(env);
//添加启动ApplicationMaster的命令
//填充commands;
amContainer.setCommands(commands);
//设置ApplicationMaster所需的资源
amContainer.setResource(capability);

ApplicationSubmissionContext appContext = 
Records.newRecord(ApplicationSubmissionContext.class);
//设置application id,调用GetNewApplicationResponse#getApplicationId()
appContext.setApplicationId(appId);
//设置Application名称:“DistributedShell”
appContext.setApplicationName(appName);
//设置前面创建的container
appContext.setAMContainerSpec(amContainer);
//设置application的优先级,默认是0
pri.setPriority(amPriority);
//设置application的所在队列,默认是""
appContext.setQueue(amQueue);
//设置application的所属用户,默认是""
appContext.setUser(amUser);
applicationsManager.submitApplication(appRequest);
while (true) {
  Thread.sleep(1000);
  GetApplicationReportRequest reportRequest = 
Records.newRecord(GetApplicationReportRequest.class);
  reportRequest.setApplicationId(appId);
  GetApplicationReportResponse reportResponse = 
applicationsManager.getApplicationReport(reportRequest);
  ApplicationReport report = reportResponse.getApplicationReport();
  //打印report内容
  ...
  YarnApplicationState state = report.getYarnApplicationState();
  FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
  if (YarnApplicationState.FINISHED == state) {
    if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
      return true;
    } else {
      return false;
    }
  } else if (YarnApplicationState.KILLED == state   
          || YarnApplicationState.FAILED == state) {
    return false;
  } 
}

Distributedshell ApplicationMaster源码分析

RegisterApplicationMasterRequest appMasterRequest = 
Records.newRecord(RegisterApplicationMasterRequest.class);
appMasterRequest.setApplicationAttemptId(appAttemptID);
appMasterRequest.setHost(appMasterHostname); appMasterRequest.setRpcPort(appMasterRpcPort);
appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
return resourceManager.registerApplicationMaster(appMasterRequest);
while (numCompletedContainers.get() < numTotalContainers
        && !appDone) {
   Thread.sleep(1000);
   List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
   if (askCount > 0) {
    ResourceRequest containerAsk = setupContainerAskForRM(askCount);
    resourceReq.add(containerAsk);
   }
   //如果resourceReq为null,则可看做心跳信息,否则就是申请资源
   AMResponse amResp =sendContainerAskToRM(resourceReq);
}
LaunchContainerRunnable runnableLaunchContainer = 
new LaunchContainerRunnable(allocatedContainer); 
//每个container由一个线程启动
Thread launchThread = new Thread(runnableLaunchContainer); 
launchThreads.add(launchThread);
launchThread.start();
FinishApplicationMasterRequest finishReq = 
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setAppAttemptId(appAttemptID);
boolean isSuccess = true;
if (numFailedContainers.get() == 0) {
  finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
}
上一篇下一篇

猜你喜欢

热点阅读