2019-03-16 spark 分区

2019-03-16  本文已影响0人  做一只乐观的小猴子

spark 分区与性能。

RDD 的分区并行运行1个并发的任务,达到CPU的数量或者2-3倍。

比较好的分区是excutors的数量。

可以设置默认的分区数,并行度:sc.defaultParallelism在conf/spark-defaults.conf设置。

RDD的action函数产生的输出数量文件,也是分区的数量决定(上限取决于excutors的可用内存大小)。

sc.textFile(path,num_partitions)对非压缩文件。压缩文件不可以指定分区数量,但是可以repartition 调整分区数量。

map, filter,flatmap 不会保留指定的分区,把函数会运用到每个分区。

分区方式: HashPartitioner 和RangePartition.

HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,因此当某一或某几种类型数据量较多时,就会造成若干Partition中包含的数据过大问题,而在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢。

RangePartitioner基于抽样的思想来对数据进行分区。图4简单描述了RangePartitioner的数据分区过程。

---------------------

对于初始读入的数据是不具有任何的数据分区方式的。数据分区方式只作用于<Key,Value>形式的数据。因此,当一个Job包含Shuffle操作类型的算子时,如groupByKey,reduceByKey etc,此时就会使用数据分区方式来对数据进行分区,即确定某一个Key对应的键值对数据分配到哪一个Partition中.

在Spark Shuffle阶段中,共分为Shuffle Write阶段和Shuffle Read阶段,其中在Shuffle Write阶段中,Shuffle Map Task对数据进行处理产生中间数据,然后再根据数据分区方式对中间数据进行分区。最终Shffle Read阶段中的Shuffle Read Task会拉取Shuffle Write阶段中产生的并已经分好区的中间数据。

job、stage、task

 Worker Node:物理节点,上面执行executor进程

 Executor:Worker Node为某应用启动的一个进程,执行多个tasks

 Jobs:action 的触发会生成一个job, Job会提交给DAGScheduler,分解成Stage,

 Stage:DAGScheduler 根据shuffle将job划分为不同的stage,同一个stage中包含多个task,这些tasks有相同的 shuffle dependencies。

上一篇下一篇

猜你喜欢

热点阅读