spark needsUnsafeRowConversion
2020-12-29 本文已影响0人
鸿乃江边鸟
spark版本 3.0.1
在spark 中存在一个bug,该bug的详细信息如下:
None.get
java.util.NoSuchElementException: None.get
scala.None$.get(Option.scala:529)
scala.None$.get(Option.scala:527)
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion$lzycompute(DataSourceScanExec.scala:178)
org.apache.spark.sql.execution.FileSourceScanExec.needsUnsafeRowConversion(DataSourceScanExec.scala:176)
org.apache.spark.sql.execution.FileSourceScanExec.doExecute(DataSourceScanExec.scala:463)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:133)
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:47)
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:96)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:3200)
org.apache.spark.sql.Dataset.rdd(Dataset.scala:3198)
根据源码定位FileSourceScanExec,定位到如下位置:
SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled
SparkSession.getActiveSession.get的内容如下:
/**
* Returns the active SparkSession for the current thread, returned by the builder.
*
* @note Return None, when calling this function on executors
*
* @since 2.2.0
*/
def getActiveSession: Option[SparkSession] = {
if (TaskContext.get != null) {
// Return None when running on executors.
None
} else {
Option(activeThreadSession.get)
}
}
正如注释所写的一样,当在executors端获取SparkSession的时候,直接返回None。 为什么直接返回none,可以参考spark-pr-21436
当然这个问题,已经有人发现了并且提交了pr-29667,所以拿到commitID(37a660866342f2d64ad2990a5596e67cfdf044c0)直接cherry-pick就ok了,
分析一下原因:
其实该原因就是同一个jvm中,两个不同的线程同步调用,就如unit test所示:
test("SPARK-32813: Table scan should work in different thread") {
val executor1 = Executors.newSingleThreadExecutor()
val executor2 = Executors.newSingleThreadExecutor()
var session: SparkSession = null
SparkSession.cleanupAnyExistingSession()
withTempDir { tempDir =>
try {
val tablePath = tempDir.toString + "/table"
val df = ThreadUtils.awaitResult(Future {
session = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
session.createDataFrame(
session.sparkContext.parallelize(Row(Array(1, 2, 3)) :: Nil),
StructType(Seq(
StructField("a", ArrayType(IntegerType, containsNull = false), nullable = false))))
.write.parquet(tablePath)
session.read.parquet(tablePath)
}(ExecutionContext.fromExecutorService(executor1)), 1.minute)
ThreadUtils.awaitResult(Future {
assert(df.rdd.collect()(0) === Row(Seq(1, 2, 3)))
}(ExecutionContext.fromExecutorService(executor2)), 1.minute)
} finally {
executor1.shutdown()
executor2.shutdown()
session.stop()
}
}
}