Hadoop

distributedshell yarn编程指南

2020-02-06  本文已影响0人  spraysss

hadoop yarn是一个独立的调度框架,我们自己也可以通用yarn提供的api编写程序将我们自己的写的程序用yarn来调度运行

hadoop 官提供了一个distributedshell yarn程序,该程序用途是实现使用yarn调度框架执行shell,Hadoop提供这个程序的目的是为了通过这个简单的例子说明如果自己写一个yarn程序
具体代码地址:
https://github.com/apache/hadoop/tree/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell

编写yarn程序一般需要

接口

底层接口

ApplicationClientProtocol
clientsResourceManager之间用于提交/中断job和获取 application ,集群metrics ,node queueACLs信息的底层协议

ApplicationMasterProtocol
AM和RM通信的底层协议,ApplicationMasterProtocol.allocate用于AM和RM心跳

ContainerManagementProtocol
AM和NN通信的底层协议,用于启动和停止容器以及获取运行容器的状态信息

高层接口

Client<-->ResourceManager
使用 YarnClient 对象

ApplicationMaster<-->ResourceManager

用于向RM申请Container,使用AMRMClientAsync对象, AMRMClientAsync.CallbackHandler 用于异步事件处理

ApplicationMaster<-->NodeManager
用于在NodeManager上启动容器,使用NMClientAsyncNodeManager通信,NMClientAsync.CallbackHandler用于异步事件处理

distributedshell

下面我们来看一下hadoop的这个例子程序distributedshell是怎么编写的

编写Client

org.apache.hadoop.yarn.applications.distributedshell.Client这个是客户端入口程序用于和RM交互

  1. 程序首先初始化了YarnClient
  yarnClient = YarnClient.createYarnClient();
  yarnClient.init(conf);

然后调用yarnClient.createApplication方法创建App,获取application id,底层api使用的是ApplicationClientProtocol

  // Get a new application id
    YarnClientApplication app = yarnClient.createApplication();
    GetNewApplicationResponse appResponse = app.getNewApplicationResponse();

根据请求RM获取到的application id 构造ApplicationSubmissionContext,ApplicationSubmissionContext代表了RM启动ApplicationMaster所需要的信息,客户端需要在这个上下文中设置如下信息:

ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
...
//设置appName
 appContext.setApplicationName(appName);
...
//准备Local Resources ,Environment ,和Command
...
// 通过准备的信息构造 application master的 ContainerLaunchContext
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( localResources, env, commands, null, null, null);
...
//设置启动资源信息
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
//设置ContainerLaunchContext
appContext.setAMContainerSpec(amContainer);
//设置优先级
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);
//设置队列
appContext.setQueue(amQueue);
//提交应用
yarnClient.submitApplication(appContext);

举个例子帮助理解
假如以test用户运行hadoop-yarn-applications-distributedshell, 运行命令如下:

hadoop jar hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar -jar hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.2.jar  -queue root.download -shell_script /tmp/a.sh

假设yarnClient.createApplication()

/user/test/DistributedShell/application_1576067711791_1132781/AppMaster.jar
/user/test/DistributedShell/application_1576067711791_1132781/ExecScript.sh

其中运行ApplicationMastercomond为:

org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr
客户端与RM交互图
Client<->RM
ApplicationSubmissionContext结构图
image.png

编写 ApplicationMaster (AM)

AM使用ApplicationAttemptId与RM交互
可以从传进来的env环境变量中获取 Container信息,进一步获取ApplicationAttemptId

      ContainerId containerId = ConverterUtils.toContainerId(envs
          .get(Environment.CONTAINER_ID.name()));
      appAttemptID = containerId.getApplicationAttemptId();

在AM完全初始化自身之后,我们可以启动两个客户端:一个与RM通信,一个与NM通信。并设置相关的事件处理函数

    //与RM通信并设置相关的事件处理函数
    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
    amRMClient.init(conf);
    amRMClient.start();
    
    //与NM通信并设置相关的事件处理函数
    containerListener = createNMCallbackHandler();
    nmClientAsync = new NMClientAsyncImpl(containerListener);
    nmClientAsync.init(conf);
    nmClientAsync.start();

AM必须向RM发出心跳信息

// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
        appMasterTrackingUrl);

AM向RM申请container资源

// Dump out information about cluster capability as seen by the
    // resource manager
    int maxMem = response.getMaximumResourceCapability().getMemory();
    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
    
    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
    LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);

    // A resource ask cannot exceed the max.
    if (containerMemory > maxMem) {
      LOG.info("Container memory specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerMemory + ", max="
          + maxMem);
      containerMemory = maxMem;
    }

    if (containerVirtualCores > maxVCores) {
      LOG.info("Container virtual cores specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
          + maxVCores);
      containerVirtualCores = maxVCores;
    }

    List<Container> previousAMRunningContainers =
        response.getContainersFromPreviousAttempts();
    LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
      + " previous attempts' running containers on AM registration.");
    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());

    int numTotalContainersToRequest =
        numTotalContainers - previousAMRunningContainers.size();
    // Setup ask for containers from RM
    // Send request for containers to RM
    // Until we get our fully allocated quota, we keep on polling RM for
    // containers
    // Keep looping until all the containers are launched and shell script
    // executed on them ( regardless of success/failure).
   //  申请启动container
    for (int i = 0; i < numTotalContainersToRequest; ++i) {
      ContainerRequest containerAsk = setupContainerAskForRM();
      amRMClient.addContainerRequest(containerAsk);
    }
    numRequestedContainers.set(numTotalContainers);

AMRMClientAsync.CallbackHandler

onContainersAllocated回调函数启动LaunchContainerRunnable线程执行

        //containerListener是 containerListener = createNMCallbackHandler();
//containerListener是NMCallbackHandler类型的
        LaunchContainerRunnable runnableLaunchContainer =
            new LaunchContainerRunnable(allocatedContainer, containerListener);
        Thread launchThread = new Thread(runnableLaunchContainer);

LaunchContainerRunnable

LaunchContainerRunnablerun方法里面构造了需要在container中运行shellContainerLaunchContext,并且绑定containerListener回调函数,然后使用nmClientAsync 异步启动container

      //封装用于启动shell脚本的ctx
      ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
        localResources, shellEnv, commands, null, allTokens.duplicate(), null);
      //注册NMCallbackHandler回调函数
      containerListener.addContainer(container.getId(), container);
      //使用nmClientAysnc异步启动container
      nmClientAsync.startContainerAsync(container, ctx);

整体交互流程图

yarn 交互
上一篇 下一篇

猜你喜欢

热点阅读