大数据

yarn任务提交过程源码分析

2018-12-28  本文已影响0人  JX907
clipboard.png

在Yarn上运行的container包含两类,一类是ApplicationMaster,这是每个yarn任务启动的第一个Container;另一类是运行用户任务的Container。

ApplicationMaster进程启动

1.Yarn Client 向 Yarn 中提交应用程序。(对应上图中1)

客户端向Yarn提交任务时需要调用org.apache.hadoop.yarn.client.api.YarnClient的两个api方法:


public abstract YarnClientApplication createApplication()

   throws YarnException, IOException;

public abstract ApplicationId submitApplication(

   ApplicationSubmissionContext appContext) throws YarnException,

   IOException;

createApplication向通过RPC与ResourceManager进程通信(rmClient.getNewApplication(request)),让其分配一个新的AppLication,结果存在GetNewApplicationResponse实体中,其中包括ApplicationId、集群最大可分配资源。createApplication的结果存在YarnClientApplication实体中。

客户端获取到YarnClientApplication后需要设置其中的上下文对象中的信息org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext,包括aplicationName、资源、队列、优先级、ApplicationMaster启动命令(在ContainerLaunchContext实体中,普通Container启动也使用这个实体),最后调用上面提到的第二个方法submitApplication,将ApplicationSubmissionContext实体传到ResourceManger端(rmClient.submitApplication(request);)。

2.ResourceManager 收到请求后,在集群中选择一个 NodeManager,并为该应用程序分配一个 Container,在这个 Container 中启动应用程序的 ApplicationMaster。 (对应图中2)

ResourceManager端对应处理代码:

1)org.apache.hadoop.yarn.server.resourcemanager.ClientRMService类处理对RM的所有RPC请求,提交任务对应处理的方法为submitApplication。

2)org.apache.hadoop.yarn.server.resourcemanager.RMAppManager#submitApplication方法,首先会实例化RMAppImpl,在实例化RMAppImpl时会初始化状态机(RMAppImpl类155行,见参考 2)

最后调用

 this.rmContext.getDispatcher().getEventHandler()

        .handle(new RMAppEvent(applicationId, RMAppEventType.START));

该处使用异步调度器和状态机模式,状态机的注册在ResouceManger#serviceInit方法中:

rmDispatcher.register(RMAppEventType.class,

      new ApplicationEventDispatcher(rmContext));

3)ApplicationMaster启动:

上面提到到了RMApp状态机,在RM中,一个RMApp对应一到多个RMappAttempt,即假如RMApp的第一个RMAppAttempt失败后,RM会根据配置启动新的RMAppAttempt,RMAppAttempt的状态机见参考3。

RMApp启动RMAppAttempt的源码:

org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl#createAndStartNewAttempt(RMApp状态机流转到APP_ACCEPTED或ATTEMPT_FAILED时触发),触发RMAppAttempt状态机中的START状态。当流转到ALLOCATED状态时RM中的ApplicationMasterLauncher与对应的NodeManager通信,启动ApplicationMaster,此时Application Attempt将被置为LAUNCHED状态。(资源申请过程略)

代码参考

org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl.AttemptStoredTransition、

org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl#launchAttempt、

org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher

状态机关系:

// Transitions from ALLOCATED_SAVING State

.addTransition(RMAppAttemptState.ALLOCATED_SAVING,

RMAppAttemptState.ALLOCATED,

RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())

// Transitions from ALLOCATED State

.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,

RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())

RM与NM通信真正启动AppMaster进程及启动后修改状态机为LAUNCHED:

org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncher#run

try {

LOG.info("Launching master" + application.getAppAttemptId());

launch();

handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),

  RMAppAttemptEventType.LAUNCHED));

} catch(Exception ie) {

launch()方法是真正启动APPMaster的地方,关键代码:

StartContainersResponse response =

containerMgrProxy.startContainers(allRequests);

此处containerMgrProxy是AppMaster和NM通信用来启动、停止、获取container状态的rpc协议,由于AppMaster是特殊的Container,且由RM发起启动,所以此处是RM与NM的通信过程。

3、AppMaster向RM申请资源并要求NM启动container(对应图中3、4)

AppMaster是一个yarn任务运行时第一个由RM启动的container,然后负责整个任务的运行,包括container的申请、启动、kill、状态检查等。ApplicationMaster属于应用程序级,其实现不是由Yarn框架提供(历史原因,yarn提供了MapReduce的ApplicationMaster的实现),需要用户自己实现ApplicationMaster进程的具体实现。以spakr的ApplicationMaster为例;

申请资源过程:

提交需求,通过心跳,把需求发送给 RM;

获取Container,通过心跳,拿到申请好的 Container;

每申请到一个 Container ,与 NM 通信,启动这个Container;

启动container过程:

调用过程:

org.apache.spark.deploy.yarn.ApplicationMaster#runExecutorLauncher

org.apache.spark.deploy.yarn.ApplicationMaster#registerAM

org.apache.spark.deploy.yarn.ExecutorRunnable#run

org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer

try {

  nmClient.startContainer(container.get, ctx)

} catch {

nmClient为NM的client类

org.apache.hadoop.yarn.client.api.impl.NMClientImpl#startContainer

StartContainersResponse response = proxy      .getContainerManagementProtocol().startContainers(allRequests);

此处通过ContainerManagementProtocol与NM通信,启动Container。

对应NM端的代码:

org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl#startContainers

Container进程启动的主类为CoarseGrainedBackend。(在哪指定的类名?)

container启动状态如何通知到AM?

4、driver端向完成DAG划分和Task划分,向excutor发送任务。(对应图中5)

图中YarnClusterScheduler类是yarn集群cluster模式下的TaskScheduler,继承YarnScheduler,YarnScheduler又继承TaskSchedulerImpl类,YarnClusterScheduler和YarnScheduler几乎没有其他实现,主要逻辑都集中在TaskSchedulerImpl类中,TaskSchedulerImpl在SparkContext类中被实例化(参考spark源码“第五节 taskscheduler”)。

driver端:

org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#launchTasks

excutor端注册、接受driver派发的任务:

org.apache.spark.executor.CoarseGrainedExecutorBackend#receive处理的消息类型包括:

RegisteredExecutor(成功向driver注册)、

RegisterExecutorFailed(向driver注册失败)、

LaunchTask(driver向excutor派发的任务)、

KillTask(杀死任务)、

StopExecutor(停止executor,转调shutdown)、

Shutdown(停止executor)

如果消息是“RegisteredExecutor”表示已经成功向driver注册,此时创建Excutor类

override def receive: PartialFunction[Any, Unit] = {

  case RegisteredExecutor =>

    logInfo("Successfully registered with driver")

    try {

      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

    } catch {

      case NonFatal(e) =>

        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)

    }

如果消息是“LaunchTask”则代表是driver派发过来的任务,此时调用executor的launchTask方法。

case LaunchTask(data) =>

  if (executor == null) {

    exitExecutor(1, "Received LaunchTask command but executor was null")

  } else {

    val taskDesc = TaskDescription.decode(data.value)

    logInfo("Got assigned task " + taskDesc.taskId)

    executor.launchTask(this, taskDesc)

  }

每个task的运行实际运行于线程池中的一个线程中,如下代码:

def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {

  val tr = new TaskRunner(context, taskDescription)

  runningTasks.put(taskDescription.taskId, tr)

  threadPool.execute(tr)

}

参考:

状态机:

1、https://blog.csdn.net/lfdanding/article/details/51786110

2、http://www.cnblogs.com/shenh062326/p/3586510.html

3、https://www.cnblogs.com/shenh062326/p/3587108.html

4、

https://blog.csdn.net/wankunde/article/details/78315438

上一篇下一篇

猜你喜欢

热点阅读