Spark任务如何执行?
Spark执行模型
Spark执行模型可以分为三部分:创建逻辑计划,将其翻译为物理计划,在集群上执行task。
可以在http://<driver-node>:4040上查看关于Spark Jobs的信息。对于已经完成的Spark应用,可以在http://<server-url>:18080上查看信息。
下面来浏览一下这三个阶段。
逻辑执行计划
第一阶段,逻辑执行计划被创建。这个计划展示了哪些steps被执行。回顾一下,当对一个Dataset执行一个转换操作,会有一个新的Dataset被创建。这时,新的Dataset会指向其父Dataset,最终形成一个有向无环图(DAG)。
物理执行计划
行动操作会触发逻辑DAG图向物理执行计划的转换。Spark Catalyst query optimizer会为DataFrames创建物理执行计划,如下图所示:
image.png
物理执行计划标识执行计划的资源,例如内存分区和计算任务。
查看逻辑执行计划和物理执行计划
可以调用explain(true)方法查看逻辑和物理执行计划。如下例所示:
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
var file = “maprfs:///data/flights20170102.json”
case class Flight(_id: String, dofW: Long, carrier: String,
origin: String, dest: String, crsdephour: Long, crsdeptime:
Double, depdelay: Double,crsarrtime: Double, arrdelay: Double,
crselapsedtime: Double, dist: Double) extends Serializable
val df = spark.read.format(“json”).option(“inferSchema”, “true”).
load(file).as[Flight]
val df2 = df.filter($”depdelay” > 40)
df2.take(1)
result:
Array[Flight] = Array(Flight(MIA_IAH_2017-01-01_AA_2315,
7,AA,MIA,IAH,20,2045.0,80.0,2238.0,63.0,173.0,964.0))
df2.explain(true)
result:
== Parsed Logical Plan ==
‘Filter (‘depdelay > 40)
+- Relation[_id#8,arrdelay#9,…] json
== Analyzed Logical Plan ==
_id: string, arrdelay: double…
Filter (depdelay#15 > cast(40 as double))
+- Relation[_id#8,arrdelay#9…] json
== Optimized Logical Plan ==
Filter (isnotnull(depdelay#15) && (depdelay#15 > 40.0))
+- Relation[_id#8,arrdelay#9,…] json
== Physical Plan ==
*Project [_id#8, arrdelay#9,…]
+- *Filter (isnotnull(depdelay#15) && (depdelay#15 > 40.0))
+- *FileScan json [_id#8,arrdelay#9,…] Batched: false, Format:
JSON, Location: InMemoryFileIndex[maprfs:///..],
在web页面http://<driver-node>:4040/SQL/上可以看到计划生成的更多细节。
image.png
在以下的代码中,我们看到df3的物理计划由FileScan、Filter、Project、HashAggregate、Exchange以及HashAggregate组成。Exchange是由groupBy转换导致的shuffle。Spark在每次shuffle之前对Exchange的数据进行hash aggregation。在shuffle后会针对之前的子aggragation进行一次hash aggregation。
val df3 = df2.groupBy(“carrier”).count
df3.collect
result:
Array[Row] = Array([UA,2420], [AA,757], [DL,1043], [WN,244])
df3.explain
result:
== Physical Plan ==
*HashAggregate(keys=[carrier#124], functions=[count(1)])
+- Exchange hashpartitioning(carrier#124, 200)
+- *HashAggregate(keys=[carrier#124], functions=[partial_
count(1)])
+- *Project [carrier#124]
+- *Filter (isnotnull(depdelay#129) && (depdelay#129 >
40.0))
+- *FileScan json [carrier#124,depdelay#129]
image.png
image.png
在集群上执行tasks
第三阶段,tasks在集群上被调度执行。scheduler将根据转换操作将DAG图划分为stage。窄依赖转换操作(没有数据移动的转换)将被分组到一个单一的stage中。
image.png
每个stage由基于partitions的task组成,这些任务将并行执行相同计算。
image.png
scheduler将这些stage task提交给task scheduler,task scheduler通过cluster manager启动task。
image.png
以下是关于执行组成的一些总结:
- Task:单台机器上运行的执行单元。
- Stage:基于partitions的一组task,执行并行计算。
- Job:具有一个或多个stages。
- Pipelining:当数据集转换操作时没有数据移动时,将Datasets折叠为单一stage。
- DAG:数据集操作时的逻辑视图。
Tasks的数量取决于partitions:在第一个阶段读取文件时,有2个partitions;shuffle过后,partitions的数量为200.可以通过rdd.partitions.size方法查看Dataset的partition数量。
df3.rdd.partitions.size
result: Int = 200
df2.rdd.partitions.size
result: Int = 2