Spark Shuffle FetchFailedExcepti

2020-06-17  本文已影响0人  JaxMa

某日遇到一个数据倾斜的SQL, 首先想到的方法就是加大Partition 看看数据hash 之后会不会落得 均匀,所以就将spark.sql.shuffle.partitions从原来的500 加大到2700 .
结果反而失败了, 错误如下:

FetchFailed(BlockManagerId(516, nfjd-hadoop02-node352.jpushoa.com, 7337, None), shuffleId=3, mapId=59, reduceId=917, message=
org.apache.spark.shuffle.FetchFailedException: failed to allocate 16777216 byte(s) of direct memory (used: 2147483648, max: 2147483648)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:554)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:485)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:64)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.sort_addToSorter_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:83)
    at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedBufferedToRowWithNullFreeJoinKey(SortMergeJoinExec.scala:941)
    at org.apache.spark.sql.execution.joins.SortMergeJoinScanner.<init>(SortMergeJoinExec.scala:811)
    at org.apache.spark.sql.execution.joins.SortMergeJoinExec$$anonfun$doExecute$1.apply(SortMergeJoinExec.scala:326)
Caused by: io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (usd: 2147483648, max: 2147483648)
    at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:725)
    at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:680)
    at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:758)
    at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:734)
    at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:227)
    at io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
    at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:342)
    ... 1 more

可以看出是 shuffle 阶段 fetch 数据 导致的内存溢出
一开始我拿到这个错误的时候有点蒙蔽了,按我的理解我加大了shuffle partitions 每个task的数据量应该是有所减少才对的,(而且在spark web ui 上也是如此体现.),那为什么数据量小了反而会在Shuffle Read 阶段Fetch Fail呢?
所以就有了以下一系列的原理性排查:

Spark Shuffle Read 是如何进行的

Spark Shuffle Read 是发生在 Reduce Task 阶段的, 由ReduceTask 所在的executor 需要去Driver 获取Shuffle数据(包括数据大小), 在哪个executor 上.并且去Fetch ,这里有一个机制就是:
当要fetch 的block size 比较大的时候使用的是stream的方式流取到本地磁盘不会一次性加载到内存.防止了内存溢出. 但当block size小于某个阈值时则将整个Block读到内存进行排序等操作. 那这个阈值是多少呢?
就是spark.maxRemoteBlockSizeFetchToMem 默认的大小是: Int.MaxValue - 512 大概就是2G这样. 源码如下:


//在BlockManager 初始化的时候就会将配置中的  spark.maxRemoteBlockSizeFetchToMem  设置进成员变量private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)

/** * Get block from remote block managers as serialized bytes. */

def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {  

 logDebug(s"Getting remote block $blockId")  
 require(blockId != null, "BlockId is null")  
 var runningFailureCount = 0  
 var totalFailureCount = 0   
 
 // 这里通过BlockManager 去与driver的BlockManager 进行调用获取block信息. 
 val locationsAndStatus = master.getLocationsAndStatus(blockId)  
 
 val blockSize = locationsAndStatus.map { b =>    b.status.diskSize.max(b.status.memSize)  }.getOrElse(0L)  
 val blockLocations = locationsAndStatus.map(_.locations).getOrElse(Seq.empty)  
 
 
//这里就是 获取配置 spark.maxRemoteBlockSizeFetchToMem 的值 然后判断block 是否是需要使用流方式spill读取,还是直接读进内存
val tempFileManager = if (blockSize > maxRemoteBlockToMem) {   
remoteBlockTempFileManager 
} else {
null  
}

这么看来好像是这个数据设置的太大了, 假设我的多个Block数据都是1G多的那么多几个 那肯定吃不消,是会内存溢出的. 接下来就把这个值spark.maxRemoteBlockSizeFetchToMem 设置成10m ,使其都进行流的方式读取到磁盘这样就不会导致错误了. em...很不幸,不成功,依旧是失败的...一样的错误, 整个人裂开了....(卡了一天)

继续开始想 ,这里很迷惑,为什么用流的方式读取还是错误呢? 经过了不断地翻阅 , 把问题锁定在了block size 这里了,因为只有这里会有不一样,所以通过阅读当前执行计划中的使用sortmerge join中的SortShuffleWriter.scala中写数据并且记录block size大小的源码:

override def write(records: Iterator[Product2[K, V]]): Unit = {
    sorter = if (dep.mapSideCombine) {
      new ExternalSorter[K, V, C](
        context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      new ExternalSorter[K, V, V](
        context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
    }
    sorter.insertAll(records)

    // Don't bother including the time to open the merged output file in the shuffle write time,
    // because it just opens a single file, so is typically too fast to measure accurately
    // (see SPARK-3570).
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    val tmp = Utils.tempFileWith(output)
    try {
      val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
      val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
      shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)

      "这里需要去创建一个mapStatus 去返回当前map完的数据输出地址,以及大小等信息"
      mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
      }
    }
  }

从上面的 MapStatus跳进来:

  private lazy val minPartitionsToUseHighlyCompressMapStatus = Option(SparkEnv.get)
    .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
    .getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)

  def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
    if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
      HighlyCompressedMapStatus(loc, uncompressedSizes)
    } else {
      new CompressedMapStatus(loc, uncompressedSizes)
    }
  }

这里根据一个参数:SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS(对应配置:spark.shuffle.minNumPartitionsToHighlyCompress 就是这里!设置了一个2000 的阈值) 根据 shuffle partitions 的数量来判断是要使用HighlyCompressedMapStatus
还是CompressedMapStatus

谜底揭开:原来没有想到的是,在超过shuffle parititon 2000 的情况下尽然会有这样的变化

结论:

上一篇下一篇

猜你喜欢

热点阅读