Saprk面试

2020-09-25  本文已影响0人  Rinma

1. 谈谈Spark RDD 的几大特性,并深入讲讲体现在哪?
Spark的RDD有五大特性:

  1. A list of partitions:RDD是由多个分区(partition)组成的集合。
  2. A function for computing each split:对于RDD的计算,其实是RDD的每个分区都会执行这个计算。
  3. A list of dependencies on other RDDs:RDD是一条依赖链,每一个RDD都会记录其父RDD的信息。
  4. Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):分区器作用在K:V结构的RDD中(HashPartition、RangPartition)。
  5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):计算就近原则,Spark会尽量的将计算和存储放在同一个位置中。

2. RDD的弹性主要体现在哪?
RDD又被称为弹性分布式数据集,其弹性体现在:

  1. 自动进行内存和磁盘的切换:主要体现在溢写方面。
  2. 基于Lineage的高效容错:也就是依赖链容错,也叫血缘关系。
  3. task和stage在失败后会进行指定次数的重试机制:task会重试3次后调用stage重试机制,stage重试4次后任务退出。
  4. checkpoint和persist的数据持久化、缓存机制:checkpoint可以将备份保存在HDFS上,主要用于失败重调;persist(cache)将中间数据保存在内存中,主要用于计算加速(后续计算中多次调用该数据集)。

3. 描述下Spark的任务提交流程?

4. 谈谈Spark的宽窄依赖?

5. Spark的job、stage划分,task跟分区的关系?
job由action算法划分,每个action算子就会触发一个新的job。
stage由宽依赖划分,每个宽依赖算子就会将job切分为两个stage。
task是Spark任务的最小执行单位,运行在Executor的线程池中,而partition是数据的最小单位,每个partition对应由一个task执行。所以Spark的并发通常与partition紧密关联。

6. Spark的算子分为哪几类?分别说说你常用的几个算子?
Spark算子通常问题transformation算子和action算子。

7. 说说Spark的shuffle?与MapReduce的shuffle有什么不同?
Shuffle就是讲数据按照一定规则打散并进行重新排序的过程,在Spark中,shuffle的过程由宽依赖算子触发,一般都是groupByKey、reduceByKey等类型的操作算子。这些过程会将原来的数据,按照key为分组重新将有序(分组)的数据分发到对应的节点上去,这个过程就叫shuffle。

8. 了解bypass机制么?详细说说?
在Spark 1.2之后,shuffle管理器由HashShuffleManager更改为SortShuffleManager。而bypass其实就是在shuffle数据量小的时候自动运行的模式,该模式放弃了排序的功能,整体功能等同于优化后的HashShuffleManager。
具体参考第7题。

9. Spark和MR有什么异同?

  1. MR是基于磁盘的分布式计算框架,Spark是基于内存的分布式计算框架。MR每个map和reduce之间,必然会产生shuffle将数据落盘;而Spark优化了此功能,shuffle仅产生在stage间,stage内部数据不落地,shuffle也是优先写在内存中,内存不足才会溢写到磁盘。
  2. Spark具有更高的容错性。Spark通过checkpoint和persist(主要是缓存)进行容错,计算失败后无需重头进行计算;而MR失败后必须重头进行计算。
  3. Spark的框架更完善。Spark具有SparkCore、SparkSQL、SparkStreaming、SparkML、SparkGraph等一站式功能;而MR仅具备基础的MapReduce离线分析引擎。
  4. Spark基于内存进行计算,在执行海量数据计算时,并不是太稳定;而MR在海量数据计算时,会比Spark运行更稳定。
  5. Spark基于内存计算,在同样数据量的情况下,执行效率要远高于MR。

10. 谈谈你平时是怎么应对Spark的数据倾斜问题的?
在shuffle的时候,ShuffleManager会将各个节点上相同key的数据拉在一个shuffle read节点上进行计算,此时如果某个key或者某个特殊值数据量过大,就会发生数据倾斜。数据倾斜只会发生在shuffle过程中。具体表现为:Spark任务执行时间长,具体表现在大多task已完成,只有少部分task需要执行很长时间;Spark发生OOM的问题也可能是数据倾斜导致的。
解决思路:

  1. 使用Hive ETL等工具预处理数据
    使用Hive或者MR等计算框架,提前对数据进行join或者预聚合。
    该方案主要思路就是将数据倾斜的压力转移到Hive、MR等计算框架,从而避免在Spark计算时发生OOM等问题。但是该方案无法实际上解决数据倾斜问题。
  2. 过滤少数导致数据倾斜的key
    大多数时候,数据倾斜都是因为某个key或者特殊值(null)而导致的,此时如果这些数据对业务本身并不会造成影响,那么可以在join或者分组前将其filter过滤掉。如计算数据内存在多少个key,则可以过滤null值,在随后的结果值+1即可。
  3. 提高shuffle并行度
    Spark默认的并行度只有200,有时候数据量很大,但是并行度很低,导致每个线程都需要计算很大的数据量,此时可能会导致任务执行效率低。此时可以在执行聚合类算子时,传递并行度,如reduceByKey(1000),SparkSQL下需要通过参数来设置全局并行度spark.sql.shuffle.partitions=1000
  4. 使用预聚合
    对于reduceByKey或者SparkSQL中的group by等操作有效,为每个key生成随机值,如hive -> hive_1,此时进行聚合,因为每个key都追加了随机值后缀,会将原来数据量大的key打散,在聚合后,将随机值还原,再进行第二阶段的聚合,此时生成的结果为真实结果。
    此方案仅适用于group by等聚合类业务场景。
  5. 使用map join
    如果是大小表join,可以将小表进行广播,将reduce join更改为map join,此方案可以直接消除shuffle。
    如果是双大表,可以将其中一个大表进行过滤,然后使用过滤后的小表再进行map join操作。
    如果双大表都不可以进行过滤,可以将其中key分布均匀的大表进行拆分,拆分后的小表进行map join操作,最后将所有结果union即为最终结果。
    如果在业务场景中,双表会频繁使用join操作,此时可以用分桶表进行优化。
  6. 采样倾斜key进行分拆join
    在方案5的第三条中,如果数据分布均匀的表key较少,但是数据量很大,拆分后也无法形成map join可以采用此方案。
    对数据分布不均匀的表进行采样,确认数据量较大的key,并将这些key(数据集A)和其他key(数据集B)拆分为两个数据集,然后数据集B正常join,数据集A对key打上随机后缀然后再进行join(此时,数据集B也需要指定同样的随机值操作),join结束后,还原key并与数据集A的结果进行union。
  7. 随机前缀和RDD扩容
    如果执行join操作的表都是大表,都存在数据分布均匀,且数据分布不均匀的key很多时,可以采用该方案。
    与方案6类似,但是缺少拆分数据A的过程。

11. 在平时的工作中,你对Spark做了什么优化?

  1. 避免创建重复的RDD,尽量对RDD进行复用。
  2. 对多次使用的RDD进行持久化处理,使用cache或者persist算子,将中间数据缓存到内存中,可以减少重复计算的过程。
  3. 尽量避免使用shuffle类算子。分布式计算中,shuffle是最影响任务性能的关键之一。
  4. 尽量使用map-side预聚合操作。如果无法避免shuffle,在业务场景支撑的情况下,可以使用具有预聚合的算子来替代普通聚合算子,如reduceByKey或者aggregateByKey替代groupByKey。
  5. 使用高性能的算子。如预聚合算子,分区算子(mapPartition),在filter后使用coalesce进行分区收缩等。
  6. 多使用广播变量,实现map join操作。
  7. 如果有自定义数据结果,尽量使用Kryo替代Java默认序列化工具。
  8. 熟悉数据和业务场景,尽力减少数据倾斜的产生。
  9. 对部分参数进行调优。
spark.shuffle.file.buffer
spark.reducer.maxSizeInFlight
...

12. Spark的内存管理机制了解吗?堆外内存了解吗?
Spark内存管理,主要是针对的是Executor的JVM内存。
作为一个JVM进程,Executor的内存管理机制是建立在JVM的堆内存上的,Spark对JVM堆内存进行了更详细的分配,以充分利用JVM堆内存。并引入了堆外内存(Off-Heap)的机制,可以直接在工作节点的内存中开辟空间,进一步优化了内存的利用。

13. 说说Spark的分区机制?
为什么要分区:在分布式运算中,最影响性能的往往是网络间的通信行为(shuffle),在将数据进行分区并将不同的分区传输到指定的计算节点中,由某一个节点独立计算某一块分区的数据,可以减少shuffle的数据量,从而提升任务执行效率。
RDD的分区原则:尽可能使分区数量 == 任务CPU Core数量。
Spark的分区操作由分区器来完成。默认分区器由两个:HashPartition和RangePartition。

14. RDD、DataFrame和Dataset有什么异同?

  1. RDD、DataFrame和Dataset都是Spark的弹性分布式数据集。
  2. 三者都为惰性的,只有在遇到action类算子才会触发执行。
  3. 三者都具有partition的概念。
  4. RDD一般在SparkCore的场景中使用。DF和DS在SparkSQL、StructStreaming、SparkML中使用。
  5. Dataset等同于RDD+scheam。
  6. DataFrame等同于Dataset[Row]。
  7. Dataset是强类型的,因此pyspark只能使用DataFrame。
    8.DataFrame和DataSet可以保存为带列头的csv等特殊格式。
    9.三者可以互相转化。

15. Spark 广播变量在项目中如何运用的?
在Spark中,当传递一个自定义个数据集(如黑名单、白名单),Spark默认会在Driver进行分发,在join等shuffle类算子中,会在每个task都分发一份,这样会造成大量的内存资源浪费和shuffle的产生。
广播变量就是为了应对该问题的。广播变量将指定数据集分发在executor中,而非task中,从而减少了内存占用,并且在join等shuffle类操作中,可以避免节点数据传输而产生的shuffle操作。
但是广播变量是只读的,不能进行修改,而且由于广播变量在每个executor中都会保存一份副本,因此如果该变量过大,会造成OOM的出现。
创建广播变量:

val a = 3
val broadcast = sc.broadcast(a)

获取广播变量:

val c = broadcast.value

16. Spark 累加器在项目中用来做什么?
在Spark程序中,我们通常会对某一项值做监控或者对程序进行调试,这种时候都需要用来累加器(计数器)。
累加器在Driver进行声明并赋初始值,累加器只能在Driver读取最终结果值,只能在Executor中进行更改。
创建累加器

val a = sc.accumulator(0)

获取累加器结果

val b = a.value

17. Repartition和Colease有什么区别?是宽依赖?还是窄依赖?
Colease和Repartition都是用来改变Spark程序分区数量的。
Colease只能缩小分区数,不会产生shuffle操作,是窄依赖。Colease底层调用的Repartition类。

def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
    Repartition(numPartitions, shuffle = false, logicalPlan)
  }
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
  extends UnaryNode {
  require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
  override def output: Seq[Attribute] = child.output
}

Repartition可以缩小和放大分区数,默认会产生shuffle操作,是宽依赖。如果指定进行分区改变,底层调用的是Repartition类;如果根据指定字段进行分区改变,底层调用的是RepartitionByExperssion类。

def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
    Repartition(numPartitions, shuffle = true, logicalPlan)
  }

@scala.annotation.varargs
  def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan {
    RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions))
  }

@scala.annotation.varargs
  def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
    RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None)
  }
case class RepartitionByExpression(
    partitionExpressions: Seq[Expression],
    child: LogicalPlan,
    numPartitions: Option[Int] = None) extends RedistributeData {
  numPartitions match {
    case Some(n) => require(n > 0, s"Number of partitions ($n) must be positive.")
    case None => // Ok
  }

18.SparkStreaming结合Kafka的两种方式分别是什么?
Receiver和Direct两种模式。

19. SparkStreaming的WAL了解吗?
WAL(Write ahead logs):预写日志。主要用于故障恢复,保证数据的无丢失。
WAL使用文件系统或者数据库作为数据持久化,先将数据写入到持久化的日志文件中去,其后才执行其他逻辑,此时如果程序崩溃,在重启后可以直接读取预写日志进行恢复。
预写日志需要通过参数来开启spark.streaming.receiver.writeAheadLog.enable=true,并且同时在SparkStreaming的环境中设置checkpoint的保存路径。

20.SparkStreaming的反压机制了解吗?详细介绍下?
SparkStreaming的反压机制是1.5版本后退出的新特性,主要用于动态处理数据的摄入速度。
当批处理时间大于批次间隔时,说明数据处理能力已经小于数据的进入速度,这种情况会导致数据的积压,最终可能会引发程序OOM。
手动情况下(一般都是Kafka),可以通过参数spark.streaming.kafka.maxRatePerPartition来手动指定摄入的最大速度。但是这种方法需要提前预知程序的处理能力和数据峰值。
反压机制由SparkStreaming动态来调整数据的摄入速度,通过参数spark.streaming.backpressure.enabled=true来开启反压机制。
其他参数:

spark.streaming.backpressure.enabled=false;  // 开启反压机制。默认为false。
spark.streaming.backpressure.initialRate  // 设定初始化接收值。只适用于Receiver模式。
spark.streaming.kafka.maxRatePerPartition  // 设定每个消费线程最大消费kafka分区的数量。默认全部。
spark.streaming.stopGracefullyOnShutdown  // 设定未处理数据,不会强制killSparkStreaming程序。

21.SparkStreaming如何实现Exactly-Once语义?
容错语义一般有三种:

上一篇下一篇

猜你喜欢

热点阅读