Spark性能优化笔记

2018-09-01  本文已影响0人  卡卡xx

为什么需要调优

在大数据计算领域,Spark已经成为了越来越流行、越来越受欢迎的计算平台之一。然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的。如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此,想要用好Spark,就必须对其进行合理的性能优化。
Spark的性能调优由开发调优、资源调优、数据倾斜调优、shuffle调优几个部分组成。开发调优和资源调优是所有Spark作业都需要注意和遵循的一些基本原则,是高性能Spark作业的基础;数据倾斜调优,主要用一套完整的用来解决Spark作业数据倾斜的解决方案;shuffle调优,面向的是对Spark的原理有较深层次掌握的开发者。

性能优化学习

学习Spark开发调优和资源调优比较好的方式是参考美团点评技术团队的技术博客Spark性能调优-基础篇,这里已经写得非常全面了,学习完就可以掌握Spark性能调优的基础部分了。总体可以分为两个方面:

性能优化实践

MovieLens数据集为基础,完成Spark的Map-Side Join和Reduce Side Join例子(过滤出评分高于4.0分的电影,要求显示电影ID 电影名称 电影分数),并比较性能优劣。应该如何调整不同的spark-submit参数获得最佳效果(运行时间),并给出基于目前的运行环境最优参数设置方案。

查看数据

简单查看一下所有表的结构才能完成目标任务。

所有评级都包含在“ratings.dat”文件中,并且位于格式如下:
用户名 MovieID 评级 时间戳
用户信息位于“users.dat”文件中,如下所示
用户名 性别 年龄 职业 邮政编码
电影信息位于文件“movies.dat”中,如下所示
MovieID 标题 流派

所以任务就是把3个表连接起来并按条件过滤,但是不同的连接方式在性能上会出现极大的差距。

Reduce Side Join

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object ReduceJoin {
  def main(args: Array[String]){
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val ratingRDD = sc.textFile(args(0)) //rating.dat表
    val moviesRDD = sc.textFile(args(1)) //movies.dat表
    val startTime = System.currentTimeMillis()//开始时间

    val ratingPair = ratingRDD.map { x =>  //将数据转化为(K,V),K为movieID,V为平均分,直接join
      val temp = x.split("::") //按照原始数据格式拆分为RDD格式
      (temp(1),(temp(2).toFloat,1))
    }.reduceByKey((x,y) => (x._1.toFloat+y._1.toFloat,x._2+y._2)).mapValues(x =>
      x._1/x._2  //通过每部电影总分和评论的人数计算出平均分
    ).filter(x => x._2.toFloat >= 4.0)//过滤出分数高于4分的电影

    val moviePair = moviesRDD.map { x => //将它的类型转化为(K,V),K为movieID,方便join操作
      val temp = x.split("::")
      (temp(0),temp(1))
    }
    //println(ratingPair.count()) //查看rating表是否成功过滤掉4.0以下的电影
    //根据key(movieID)进行连接,并将数据从KV形式格式化为原始格式
    val result = moviePair.join(ratingPair).map(x => (x._1,x._2._1,x._2._2))
    result.saveAsTextFile(args(2))
    val endTime = System.currentTimeMillis()//结束时间
    println("运行时间(秒)"+(endTime-startTime)*0.001)
  }
}

然后编写相应的运行脚本,这里submit的时间随便使用最简单的几个参数,因为目的是对比map-side join和reduce-side join性能上的差异。

#!/bin/bash
hdfs dfs -rm -r /tmp/result
spark-submit --class ReduceJoin --master yarn-cluster /usr/tmp/untitled.jar /tmp/input/ratings.dat /tmp/input/movies.dat /tmp/result

提交任务后就可以去master的8088端口查看spark任务的执行情况了,18088端口查看执行记录和详细过程,看到Reduce-side join任务执行情况如下:


总体执行情况
job中每个阶段信息 部分结果

[图片上传中...(image.png-94aeb0-1535712775641-0)]

这里首先需要明白job,stage,task的概念。简单的讲,我们提交一个作业到spark,spark首先根据提交作业中的action算子将作业分为若干个job
之后对于每个job而言,Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。
每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。
这里只有一个job是因为只有一个action算子(savaAsTextFile),3个stage是因为reduceByKey属于shuffle算子,还有未经协同划分的join也属于shuffle算子,一起将job分成了3个stage,每个stage的2个task是因为RDD数据被存在了两台机器上。通过时间统计可以看到stage1是最消耗时间的,因为它要执行reduceByKey的shuffle操作,会把key相同的数据集中到一个节点,在这个时候数据是整个评论数据集。而后面求平均后过滤再join的时候数据已经变得不是那么多了,所以这里的shuffle相对消耗时间较少(网络,IO少)。

Map-Side Join


简单的讲就是把需要join的数据集中较小的那个数据集进行广播(因为在分布式系统应用中,存储数据都是用RDD对象,每个RDD对象中的数据都被划分为多个分区,每个节点都只持有部分分区,也就是数据集的一部分,而广播就是让每个节点都持有被广播数据的完整信息),然后在每个节点上(map端操作)将自己节点上持有的部分数据和被广播的表进行连接即可。但是需要注意,因为那个小的数据集要被广播,所以要求每个节点的内存必须足够存储被广播的那个数据集,不然就不能进行map-side-join。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object MapJoin {
  def main(args: Array[String]){
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val ratingRDD = sc.textFile(args(0)) //rating.dat表
    val moviesRDD = sc.textFile(args(1)) //movies.dat表
    val startTime = System.currentTimeMillis()//开始时间

    val ratingPair = ratingRDD.map { x =>  //将数据转化为(K,V),K为movieID,V为平均分
      val temp = x.split("::") //按照原始数据格式拆分为RDD格式
      (temp(1),(temp(2).toFloat,1))
    }.reduceByKey((x,y) => (x._1.toFloat+y._1.toFloat,x._2+y._2)).mapValues(x =>
      x._1/x._2  //通过每部电影总分和评论的人数计算出平均分
    ).filter(x => x._2 >= 4.0)//过滤出分数高于4分的电影

    val moviePair = moviesRDD.map { x => //将数据转化为(K,V),K为movieID,V为电影name
      val temp = x.split("::")
      (temp(0),temp(1))
    }.collectAsMap//保存为map 进行广播

    var moviesBroadcast = sc.broadcast(moviePair) //将电影数据集广播,使每个节点都有一份完整的,就不需要shuffle
    var result = ratingPair.map({ x =>
      var movies = moviesBroadcast.value //取出广播变量内容值
      var name = movies.getOrElse(x._1,"No") //取出当前movieID的电影名字
      (x._1,(name,x._2)) //
    })
    result.map(x => (x._1,x._2._1,x._2._2)).saveAsTextFile(args(2))//重新定义输出格式并输出
    val endTime = System.currentTimeMillis()//结束时间
    println("运行时间(秒)"+(endTime-startTime)*0.001)
  }
}
总体执行情况
Job0
Job1 部分结果

这里出现了两个Job的原因是有两个action算子(saveAsTextFile,collectAsMap)。在Job0中,只进行了一个工作collectAsMap,是为了后面广播方便。在Job1中,因为我们避免了耗时的join的Shuffle操作,自然就只有两个stage了。

运行结果分析

Spark参数优化

前面说了,除了可以在开发过程中进行开发调优,还可以灵活的分配资源,使在现有资源上运行达到最优。以Map-side Join为例,使用3台centos6.5虚拟机,内存分别为6 2 2,cup为4核心i5。

要进行资源调优,必须对Spark作业的资源使用原理有一个清晰的认识,并知道在Spark作业运行过程中,有哪些资源参数是可以设置的,以及如何设置合适的参数值。 Spark作业基本运行原理

详细原理见上图。我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,美团•大众点评使用的是YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。

在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。

当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。

因此Executor的内存主要分为三块:第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。

task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。如果CPU core数量比较充足,而且分配到的task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些task线程。

以上就是Spark作业的基本运行原理的说明,大家可以结合上图来理解。理解作业基本原理,是我们进行资源参数调优的基本前提。

num-executors

之前没有手动设置这个参数,可以看到spark启动了2个executor进程。我这里只有3台虚拟机,尝试就设置为3了。发现性能降低了很多,估计是因为我的数据太小了,节点多了就加大了shuffle的消耗,所以设置为1,发现更快了,所以这真的是因为数据太小,数据传输的时间代价大于数据处理的时间代价。平常大数据情况下这个参数应该根据经验设置为50-100。

num-executor分别为1 3 2的时间消耗
num-executor为1
executor-memory
这个参数设置每个executor的内存,下限是要保证每个executor正常运行不会发生oom,没有上限,具体大小根据经验即可。我这里因为节点内存只剩一个G,所以就设置为1G。发现速度又加快了一秒,毕竟内存变大了自然处理速度就提起来了。
executor-cores

由于我的集群是运行在虚拟机上的,所以所有节点共享windows的cpu,即每个节点相当于有4个cpu,所以设置为4,发现报错了,应该是资源不足,设置为2也报错,应该是虚拟机的cpu限制机制吧,所以只能设置为1或者默认了。

driver-memory

这个参数只要保证进行collect算子的时候,所有数据全部集中到Driver进程不会oom就行了,我这里数据相当小就不用设置了。

spark.default.parallelism

首先随便查看一个stage的信息,发现每个executor的task为1(因为文件很小只有一个hadoop block),只有一个线程完全没有并发,效率很低。根据num-executors * executor-cores的2~3倍,我这里就设置为2,和默认相比进步了1s。


spark.storage.memoryFraction
在本次提交的作业中,因为没有出现很多的重复使用,所以没有使用RDD持久化,因此应当设置为很低,把更多的内存用来执行task。再去看了一下执行task时候的GC时间,发现GC消耗还是挺大的,所以把这个参数设置为0.4,GC时间减少了一些,总执行时间也减少了一秒。
spark.shuffle.memoryFraction
因为此实验中有2个shuffle操作,所以应当调大点,避免在进行聚合操作时内存不够写入磁盘导致性能下降。但是这里数据量并不大,shuffle聚合时内存的大小对时间影响并不是很大,主要的时间是计算,所以保持默认0.2即可,给更多的内存给计算。
序列化算法

在Spark的架构中,在网络中传递的或者缓存在内存、硬盘中的对象需要进行序列化操作,序列化的作用主要是利用时间换空间:

上面的六种数据,通过Java序列化(默认的序列化方式)形成一个二进制字节数组,大大减少了数据在内存、硬盘中占用的空间,减少了网络数据传输的开销,并且可以精确的推测内存使用情况,降低GC频率。
但是在序列化和反序列化的过程中,会消耗大量的时间,所以选择一个好的序列化算法很重要。目前Spark使用Kryo比Java默认的序列化快10倍。具体原理可见Kryo参考,这里只需要添加配置使用Kryo即可。
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化库
运行时间变成了17s,再去查看序列化和反序列化的时耗:

未使用Kryo
使用Kryo 可能数据太小了,看不出什么大的效果。

总结

  1. 首先要从根源进行优化,也就是编写程序的时候,比如注意避免创建重复RDD、持久化常使用的RDD等编码方式。
  2. 编码过程中尽量少的出现shuffle操作,用其它操作代替。
  3. 序列化和反序列化使用得非常多,所以使用Kryo比默认快10倍是非常重要的。
  4. 对于资源而言,没有绝对的配置方法,首先要理解每个资源参数的意义和使用经验,再根据自己的集群状态来做调整。
上一篇 下一篇

猜你喜欢

热点阅读