Kyuubi 解锁 Spark SQL on CDH 6
背景
CDH 最后一个免费版 6.3.2 发布一年有余,离线计算核心组件版本停在了 Hadoop 3.0.0,Hive 2.1.1,Spark 2.4.0。随着 Spark 3.0 的重磅发布,在性能方面又迎来了一次飞跃,本文将描述把 Spark 3 集成到 CDH 6.3.1(未开启 Kerberos) 的过程,并使用 Kyuubi 替换 HiveServer2,实现 OLAP、ETL 等场景下从 HiveQL 到 SparkSQL 的无缝迁移,享受 5x-10x 的性能红利。
CDH 缺陷修复
[ORC-125] 修复 Hive 不能读取高版本 ORC 写入的数据
当使用 Hive 读取由 Presto 或者 Spark 等写入的 ORC 文件时,会出现以下错误。
ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 6
该问题在 ORC 上游被修复 [ORC-125] Correct OrcFile.WriterVersion to correctly use FUTURE。
ORC 最早是 Hive 的一个子项目,在 CDH 6 集成的 Hive 2.1 这个版本里,ORC 还没有分离出去,所以这个问题要在 Hive 源码里修复。
我做了一个打包好的修复版本,GitHub 传送门,下载更换 /opt/cloudera/parcels/CDH/lib/hive/lib
路径下的 hive-exec-2.1.1-cdh6.3.1.jar
, hive-orc-2.1.1-cdh6.3.1.jar
即可。(至少需要更换 Hadoop Client、HiveServer2 节点,如果你不知道我在说什么,就把所有节点都换掉)
Spark 3.1
[SPARK-33212] Spark 使用 Hadoop Shaded Client
Hadoop 3.0 提供了 Shaded Client,用于下游项目规避依赖冲突 [HADOOP-11656] Classpath isolation for downstream clients。
Spark 3.2 在 hadoop-3.2
profile 中切换到了 Hadoop Shaded Client
[SPARK-33212] Upgrade to Hadoop 3.2.2 and move to shaded clients for Hadoop 3.x profile。
该更改不是必须的,但个人建议从 Spark 主线将该补丁移植到 branch-3.1 使用,以规避潜在的依赖冲突。
[CDH-71907] Spark HiveShim 适配 CDH Hive
Spark 通过反射和隔离的类加载器来实现对多版本 Hive Metastore 的支持,详情参考
Interacting with Different Versions of Hive Metastore - Spark Documentation。
CDH 6 使用修改过的 Hive 2.1.1 版本,其方法签名与 Apache 版本有所不同,故 Spark HiveShim 反射调用会出现找不到方法签名,需要手动将 CDH-71907 补丁打到 branch-3.1。
Spark External Shuffle Service 协议兼容
Spark shuffle 时,mapper 会将数据写入到本地磁盘而非 HDFS,引入 ESS 后,会将文件信息注册到 ESS 中,将 mapper 与 reducer 解耦。CDH 中,默认会启用 Spark Yarn External Shuffle Service,作为 YARN AUX Service 在所有 Yarn Node 上启动。
Spark 3 修改了 shuffle 通信协议,在与 CDH 2.4 版本的 ESS 交互时,需要设置 spark.shuffle.useOldFetchProtocol=true
,否则可能报如下错误。[SPARK-29435] Spark 3 doesn't work with older shuffle service
IllegalArgumentException: Unexpected message type: <number>.
Spark 版本迁移指南
如果你有现存的基于 CDH Spark 2.4 的 Spark SQL/Job,在将其迁移至 Spark 3 版本前,请参阅完整的官方迁移指南。
- Migration Guide: Spark Core - Spark Documentation
- Migration Guide: SQL, Datasets and DataFrame - Spark Documentation
Spark 部署
官方文档 Running Spark on YARN - Documentation 中提到
To make Spark runtime jars accessible from YARN side, you can specify
spark.yarn.archive
orspark.yarn.jars
. For details please refer to Spark Properties. If neitherspark.yarn.archive
norspark.yarn.jars
is specified, Spark will create a zip file with all jars under$SPARK_HOME/jars
and upload it to the distributed cache.
因此,无需在 CDH 所有节点上部署 Spark 3,只需在 Hadoop Client 节点上部署 Spark 3 即可。
如果你对集群权限管理没有十分严格的要求,请使用 hive 用户以避免权限问题。
我基于 Spark 3.1.2 制作了一个适配 CDH 6 的版本, GitHub 传送门 ,下载解压至 /opt
,并软链至 /opt/spark3
。
[hive@cdh-kyuubi]$ ls -l /opt | grep spark
lrwxrwxrwx 1 root root 39 Aug 10 18:46 spark3 -> /opt/spark-3.1.2-cdh6-bin-3.2.2
drwxr-xr-x 13 hive hive 4096 Aug 10 18:46 spark-3.1.2-cdh6-bin-3.2.2
配置 Hadoop、Hive
CDH 会将配置文件自动分发到所有节点 /etc
目录下,建立软链即可。
ln -s /etc/hadoop/conf/core-site.xml /opt/spark3/conf/
ln -s /etc/hadoop/conf/hdfs-site.xml /opt/spark3/conf/
ln -s /etc/hadoop/conf/yarn-site.xml /opt/spark3/conf/
ln -s /etc/hive/conf/hive-site.xml /opt/spark3/conf/
配置 Spark 环境变量 /opt/spark3/conf/spark-env.sh
#!/usr/bin/env bash
export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf
export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn:/etc/hive/conf
配置 Spark 默认参数 /opt/spark3/conf/spark-defaults.conf
请参考 Configuration - Spark Documentation 根据集群环境实际情况进行微调
spark.authenticate=false
spark.io.encryption.enabled=false
spark.network.crypto.enabled=false
spark.eventLog.enabled=true
spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory
spark.driver.log.dfsDir=/user/spark/driverLogs
spark.driver.log.persistToDfs.enabled=true
spark.files.overwrite=true
spark.files.useFetchCache=false
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
spark.shuffle.useOldFetchProtocol=true
spark.ui.enabled=true
spark.ui.killEnabled=true
spark.yarn.historyServer.address=http://cdh-master2:18088
spark.yarn.historyServer.allowTracking=true
spark.master=yarn
spark.submit.deployMode=cluster
spark.driver.memory=2G
spark.executor.cores=6
spark.executor.memory=8G
spark.executor.memoryOverhead=2G
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=2G
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.schedulerBacklogTimeout=1
spark.sql.cbo.enabled=true
spark.sql.cbo.starSchemaDetection=true
spark.sql.datetime.java8API.enabled=false
spark.sql.sources.partitionOverwriteMode=dynamic
spark.sql.hive.convertMetastoreParquet=false
spark.sql.hive.convertMetastoreParquet.mergeSchema=false
spark.sql.hive.metastore.version=2.1.1
spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/*
spark.sql.orc.mergeSchema=true
spark.sql.parquet.mergeSchema=true
spark.sql.parquet.writeLegacyFormat=true
spark.sql.adaptive.enabled=true
spark.sql.adaptive.forceApply=false
spark.sql.adaptive.logLevel=info
spark.sql.adaptive.advisoryPartitionSizeInBytes=256m
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionNum=1
spark.sql.adaptive.coalescePartitions.initialPartitionNum=1024
spark.sql.adaptive.fetchShuffleBlocksInBatch=true
spark.sql.adaptive.localShuffleReader.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=128m
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2
spark.sql.autoBroadcastJoinThreshold=-1
验证 spark-shell 工作正常
[hive@cdh-kyuubi]$ /opt/spark3/bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/08/30 19:53:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/08/30 19:53:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Spark context Web UI available at http://cdh-master1:4040
Spark context available as 'sc' (master = yarn, app id = application_1615462037335_40099).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.2-cdh6
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("show databases").show
+----------------+
| namespace|
+----------------+
| test|
+----------------+
scala>
至此,Spark 3 on CDH 6 已经部署完成,可以像 CDH 自带的 Spark 2.4 一样,正常的使用 spark-submit、spark-shell,但依旧不支持 spark-sql,spark-thriftserver。
相比略显鸡肋的 spark-sql,spark-thriftserver,Kyuubi 是更好的选择,因此我故意移除了这两个功能。
注意:我提供的构建版,没有启用 spark-thriftserver 模块,为正常使用 Kyuubi,请下载 hive-service-rpc-3.1.2.jar
添加到 /opt/spark3/jars
路径。
Kyuubi —— 解锁 Spark SQL 更多场景
简言之,Apache Kyuubi (Incubating) 之于 Spark,类似 HiveServer2 之于 Hive。
Kyuubi 通过将 Spark 暴露一个与 HiveServer2 完全兼容的 Thrift API,可以兼容现有的 Hive 生态,如 beeline,hive-jdbc-driver,HUE,Superset 等。
可以通过以下文档来了解 Kyuubi 的架构,Kyuubi 与 HiveServer2 和 Spark Thrift Server 的异同。
- Welcome to Kyuubi’s documentation
- Kyuubi Architecture — Kyuubi documentation
- Kyuubi v.s. HiveServer2 — Kyuubi documentation
- Kyuubi v.s. Spark Thrift JDBC/ODBC Server (STS) — Kyuubi documentation
Kyuubi 部署
Kyuubi 无需任何修改即可适配 CDH 6,下面给出关键步骤,详情可以参考 Deploy Kyuubi engines on Yarn — Kyuubi documentation。
同样的,如果你对集群权限管理没有十分严格的要求,请使用 hive 用户以避免权限问题。
解压部署
下载 kyuubi-1.3.0-incubating-bin.tgz 解压至 /opt
,并创建软链到 /opt/kyuubi
。
[hive@cdh-external opt]$ ls -l /opt | grep kyuubi
lrwxrwxrwx 1 root root 32 Aug 18 17:36 kyuubi -> /opt/kyuubi-1.3.0-incubating-bin
drwxrwxr-x 13 hive hive 4096 Aug 18 17:52 kyuubi-1.3.0-incubating-bin
修改配置
按需修改 /opt/kyuubi/conf/kyuubi-env.sh
#!/usr/bin/env bash
export JAVA_HOME=/usr/java/default
export SPARK_HOME=/opt/spark3
export SPARK_CONF_DIR=${SPARK_HOME}/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf
export KYUUBI_PID_DIR=/data/log/service/kyuubi/pid
export KYUUBI_LOG_DIR=/data/log/service/kyuubi/logs
export KYUUBI_WORK_DIR_ROOT=/data/log/service/kyuubi/work
export KYUUBI_MAX_LOG_FILES=10
参考 Kyuubi Configurations System — Kyuubi documentation 按需修改 /opt/kyuubi/conf/kyuubi-defaults.conf
kyuubi.authentication=NONE
kyuubi.engine.share.level=USER
kyuubi.frontend.bind.host=0.0.0.0
kyuubi.frontend.bind.port=10009
kyuubi.ha.zookeeper.quorum=cdh-master1:2181,cdh-master2:2181,cdh-master3:2181
kyuubi.ha.zookeeper.namespace=kyuubi
kyuubi.session.engine.idle.timeout=PT10H
spark.master=yarn
spark.submit.deployMode=cluster
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=20
spark.dynamicAllocation.executorIdleTimeout=60
注意:kyuubi-defaults.conf
中的 spark 配置优先级高于 spark-defaults.conf
性能调优
How To Use Spark Dynamic Resource Allocation (DRA) in Kyuubi — Kyuubi documentation
How To Use Spark Adaptive Query Execution (AQE) in Kyuubi — Kyuubi documentation
启动
使用 /opt/kyuubi/bin/kyuubi
在前台或后台启动 Kyuubi Server。
[hive@cdh-kyuubi]$ /opt/kyuubi/bin/kyuubi --help
Usage: bin/kyuubi command
commands:
start - Run a Kyuubi server as a daemon
run - Run a Kyuubi server in the foreground
stop - Stop the Kyuubi daemon
status - Show status of the Kyuubi daemon
-h | --help - Show this help message
Beeline 连接
使用默认参数连接 Kyuubi
beeline -u jdbc:hive2://cdh-kyuubi:10009 -n bigdata
使用自定义参数连接 Kyuubi
beeline -u "jdbc:hive2://cdh-master2:10009/;?spark.driver.memory=8G#spark.app.name=batch_001;kyuubi.engine.share.level=CONNECTION" -n batch
详细配置参考 Access Kyuubi with Hive JDBC and ODBC Drivers — Kyuubi documentation
HUE 连接
简单说,在 Cloudera Manager 中修改 HUE 配置项 Hue Service Advanced Configuration Snippet 如下,即可在 HUE 中开启 Spark SQL 引擎。
[desktop]
app_blacklist=zookeeper,hbase,impala,search,sqoop,security
use_new_editor=true
[[interpreters]]
[[[sparksql]]]
name=Spark SQL
interface=hiveserver2
[[[hive]]]
name=Hive
interface=hiveserver2
# other interpreters
...
[spark]
sql_server_host=kyuubi
sql_server_port=10009
详细配置参考 Getting Started with Kyuubi and Cloudera Hue — Kyuubi documentation
Kyuubi engine 共享级别与应用场景
Kyuubi Spark engine 即一个 Spark driver,通过控制 engine 的共享策略,可以在隔离性和资源利用率上取得平衡。Kyuubi 提供 3 种 engine 共享级别,分别为 CONNECTION,USER(默认),SERVER。
下面的讨论均假设使用 YARN Cluster 模式启动 Kyuubi Spark engine。
我们首先补充一些时间开销和 Spark 操作执行的信息。
Kyuubi Spark engine 在冷启动时,会由 Kyuubi Server 通过 spark-submit
命令向 YARN 提交一个 Spark App,即 engine,该过程从提交到 Spark driver 启动约需要 5-6s,然后 engine 将自己注册到 Zookeeper,Kyuubi Server 监听 Zookeeper 发现 engine 并建立连接,该过程约 1-2s,如此算来,在 YARN 资源空闲时,整个 engine 冷启动时间约 6-8s;Client 连接到存在的 engine,通常耗时 1s 内。
如果启动了 Spark 的 executor 动态伸缩特性,真正执行 SQL 任务时,如果资源有富余,会动态创建 executor,每个 executor 创建耗时约为 2-3s。
元数据、DDL 等操作,如获取 database 列表,CREATE TABLE
语句等,会在 driver 上执行;计算任务如 JOIN
、COUNT()
等,会由 driver 生成执行计划,在 executor 上执行。
在我们的生产环境中,大概有如下三种使用场景:
- 使用 HUE 进行 ad-hoc 查询
该场景中,会有多个用户进行查询,一般会运行相对较大的查询任务,用户对连接创建时间以及元数据加载时间较为敏感,但对查询结果响应时间有一定的容忍性。
在这种场景中,使用默认的 USER 共享级别,每个用户只使用一个 Spark engine,配合使用 Spark 的动态伸缩特性,动态的创建和销毁 executor,在保证用户之间隔离的基础上,降低启动时间和资源占用。建议的关键配置如下:
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT1H
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=30
spark.dynamicAllocation.executorIdleTimeout=120
- 使用 Beeline 运行批任务 SQL
该场景会使用统一的 batch 账号提交 SQL 任务,对响应时间敏感度低,但对稳定性要求非常要,并且应尽可能的最大化利用集群资源。
推荐在该场景下将共享级别调整为 CONNECTION,这样每个 SQL 执行将会使用独立的 Spark driver,并且 SQL 执行完毕后 Spark driver 立即退出,保障离线任务互不干扰,并且资源及时释放。建议的关键配置如下:
kyuubi.engine.share.level=CONNECTION
spark.dynamicAllocation.enabled=true
# 根据任务具体资源消耗估算,从 workflow 整体上提升集群资源利用率
spark.dynamicAllocation.minExecutors=5
spark.dynamicAllocation.maxExecutors=30
- 使用 Superset 进行多数据源联邦查询
该场景中,只有一个 service 账号,会定时同时刷新大量的图表,对连接创建时间、查询结果响应时间、并发度都有较高的要求。
该场景中,driver 和 executor 的启动时间都是不容忽视的,因此在 ad-hoc 查询配置的基础上,应延长 driver、executor 的闲置等待时间,并且设定最小的 executor 保活数量,保证时刻有 executor 常驻,能快速响应较小的查询。建议的关键配置如下:
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT10H
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=6
spark.dynamicAllocation.maxExecutors=10
spark.dynamicAllocation.executorIdleTimeout=120
我们可以启动三个 Kyuubi Server(单机或集群)来分别应对如上的三种场景,但也可以仅启动一个 Kyuubi Server(单机或集群)来满足不同的场景。
一种建议的实践方式是 Kyuubi Server 配置使用默认的 USER 共享级别,这样客户端连接时,会使用默认配置;当 ETL 跑批场景时,可以通过 beeline 参数将本次连接共享级别调整为 CONNECTION;类似的,在 Superset 连接中配置独立参数。
结语
现在去跑一些 SQL,体验 Spark SQL 带来的性能飞跃吧!