Spark 任务调度之 Driver 发送 Task 到 Exe

2018-05-13  本文已影响0人  博弈史密斯

Driver 发送 Task 到 Executor

SparkContext初始化完毕后,执行用户编写代码

SparkPi中调用RDD的reduce,reduce中
调用 SparkContext.runJob 方法提交任务,SparkContext.runJob方法调用DAGScheduler.runJob方法

DAGScheduler中,根据rdd的Dependency生成stage,stage分为ShuffleMapStage和ResultStage两种类型,根据stage类型生成对应的task,分别是ShuffleMapTask、ResultTask,最后调用 TaskScheduler 的 submitTasks提交任务,submitTasks 是接口方法,最终实现是在 TaskSchedulerImpl 中实现。
TaskSchedulerImpl 方法中最终调用 backend.reviveOffers(),backend 的子类为 CoarseGrainedSchedulerBackend。其实现了 reviveOffers 方法,最终执行 launchTasks(taskDescs)

查看 launchTasks(taskDescs) 如下:

//从 executorDataMap 中取 executorData,executorData 中保存了 Executor的连接方式 RpcEndpointRef
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

executorDataMap中保存了Executor的连接方式,关于Executor如何注册到executorDataMap中,参考Spark 任务调度之创建Executor

Executor接收Task:
Worker节点的CoarseGrainedExecutorBackend进程接收Driver(其实是 TaskScheduler,DAGScheduler、TaskScheduler 都是通过 SparkContext启动的,用户的jar包也就是 用户写的程序 都是属于 Driver)发送的task,交给Executor对象处理,如下

至此从RDD的action开始,至Executor对象接收任务的流程就结束了。
整理流程大致如下 :


Executor 执行 task 并返回结果

Executor的launchTask方法将收到的信息封装为TaskRunner对象,TaskRunner继承自Runnable,Executor使用线程池threadPool调度TaskRunner.

下来查看TaskRunner中run方法对应的逻辑,我将其分为:反序列化 task、运行 task、发送 result,三部分。

反序列化 task:

如上图注释,反序列化得到Task对象。

运行 task:

调用Task的run方法执行计算,Task是抽象类,其实现类有两个,ShuffleMapTask和ResultTask,分别对应shuffle和非shuffle任务。

Task的run方法调用其runTask方法执行task,我们以Task的子类ResultTask为例(ShuffleMapTask相比ResultTask多了一个步骤,使用ShuffleWriter将结果写到本地),如下:


为了说明上图中的func,我们以RDD的map方法为例,如下

至此,task的计算就完成了,task的run方法返回计算结果。

发送 result

最后调用CoarseGrainedExecutorBackend的statusUpdate方法返回result给Driver。
在 CoarseGrainedSchedulerBackend.scala 中的 class DriverEndpoint 中接收消息并处理。

从Executor接收任务,到发送结果给Driver的流程,如下 :


  1. 上图①所示路径,执行task任务。
  2. 上图②所示路径,将执行结果返回给Driver,后续Driver调用TaskScheduler处理返回结果,不再介绍。
上一篇下一篇

猜你喜欢

热点阅读