Spark 自定义partition引发的shuffle问题

2019-08-05  本文已影响0人  天之見證

自定义partition之后, spark shuffle 过程中出现了错误,具体信息如下:

ERROR org.apache.spark.internal.Logging$class: User class threw exception: org.apache.spark.SparkException: 
Job aborted due to stage failure: Task 6 in stage 40.0 failed 4 times, most recent failure:
 Lost task 6.3 in stage 40.0 (TID 13372, host, executor 40): 
java.lang.ArrayIndexOutOfBoundsException: 714
    at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:214)
    at org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:405)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:209)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

相同的代码在另一个数据集上跑的时候报错了,通过dag定位到这段代码中用到了自定义partition,具体定义如下:

new Partitioner {
  override def numPartitions: Int = splitsNum / 100

  override def getPartition(key: Any): Int = {
    key.asInstanceOf[String] / 100
  }
}

通过源码定位到

if (currentPartition != -1) {
    spillInfo.partitionLengths[currentPartition] = committedSegment.length();
    spills.add(spillInfo);
}

spillInfo.partitionLengths 是个array 因为 currentPartition 的值超过了才会发生这个问题

那么spillInfo.partitionLengths 这个的大小是由什么决定的呢?

查看源码可以看出spillInfo初始化的时候就定义了该数组的大小

final class SpillInfo {
  final long[] partitionLengths;
  final File file;
  final TempShuffleBlockId blockId;

  SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
    this.partitionLengths = new long[numPartitions];
    this.file = file;
    this.blockId = blockId;
  }
}

而该对象的初始化则为:

final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);

其中numPartitions 就是我们自定义partition中的值

再仔细看了spark UI中的dag图,有个shuffle阶段的分区数就是为 714,并且由于代码原因key的值 <splitsNum,这样就可以定位到问题应该是partition部分有问题,那具体是什么问题呢? 整除原因

splitsNum numPartitions key partitionId
71400 714 71399 713
71401 714 71400 714

从上面可以看出第二种情况的时候partitionId超过了numPartitions, 此时就发生了数组越界的问题

上一篇 下一篇

猜你喜欢

热点阅读