Spark

Kyuubi 解锁 Spark SQL on CDH 6

2021-08-30  本文已影响0人  517001e7cb6e

背景

CDH 最后一个免费版 6.3.2 发布一年有余,离线计算核心组件版本停在了 Hadoop 3.0.0,Hive 2.1.1,Spark 2.4.0。随着 Spark 3.0 的重磅发布,在性能方面又迎来了一次飞跃,本文将描述把 Spark 3 集成到 CDH 6.3.1(未开启 Kerberos) 的过程,并使用 Kyuubi 替换 HiveServer2,实现 OLAP、ETL 等场景下从 HiveQL 到 SparkSQL 的无缝迁移,享受 5x-10x 的性能红利。

CDH 缺陷修复

[ORC-125] 修复 Hive 不能读取高版本 ORC 写入的数据

当使用 Hive 读取由 Presto 或者 Spark 等写入的 ORC 文件时,会出现以下错误。

ORC split generation failed with exception: java.lang.ArrayIndexOutOfBoundsException: 6

该问题在 ORC 上游被修复 [ORC-125] Correct OrcFile.WriterVersion to correctly use FUTURE

ORC 最早是 Hive 的一个子项目,在 CDH 6 集成的 Hive 2.1 这个版本里,ORC 还没有分离出去,所以这个问题要在 Hive 源码里修复。

我做了一个打包好的修复版本,GitHub 传送门,下载更换 /opt/cloudera/parcels/CDH/lib/hive/lib 路径下的 hive-exec-2.1.1-cdh6.3.1.jar, hive-orc-2.1.1-cdh6.3.1.jar 即可。(至少需要更换 Hadoop Client、HiveServer2 节点,如果你不知道我在说什么,就把所有节点都换掉)

Spark 3.1

[SPARK-33212] Spark 使用 Hadoop Shaded Client

Hadoop 3.0 提供了 Shaded Client,用于下游项目规避依赖冲突 [HADOOP-11656] Classpath isolation for downstream clients

Spark 3.2 在 hadoop-3.2 profile 中切换到了 Hadoop Shaded Client
[SPARK-33212] Upgrade to Hadoop 3.2.2 and move to shaded clients for Hadoop 3.x profile

该更改不是必须的,但个人建议从 Spark 主线将该补丁移植到 branch-3.1 使用,以规避潜在的依赖冲突。

[CDH-71907] Spark HiveShim 适配 CDH Hive

Spark 通过反射和隔离的类加载器来实现对多版本 Hive Metastore 的支持,详情参考
Interacting with Different Versions of Hive Metastore - Spark Documentation

CDH 6 使用修改过的 Hive 2.1.1 版本,其方法签名与 Apache 版本有所不同,故 Spark HiveShim 反射调用会出现找不到方法签名,需要手动将 CDH-71907 补丁打到 branch-3.1。

Spark External Shuffle Service 协议兼容

Spark shuffle 时,mapper 会将数据写入到本地磁盘而非 HDFS,引入 ESS 后,会将文件信息注册到 ESS 中,将 mapper 与 reducer 解耦。CDH 中,默认会启用 Spark Yarn External Shuffle Service,作为 YARN AUX Service 在所有 Yarn Node 上启动。

Spark 3 修改了 shuffle 通信协议,在与 CDH 2.4 版本的 ESS 交互时,需要设置 spark.shuffle.useOldFetchProtocol=true,否则可能报如下错误。[SPARK-29435] Spark 3 doesn't work with older shuffle service

IllegalArgumentException: Unexpected message type: <number>.

Spark 版本迁移指南

如果你有现存的基于 CDH Spark 2.4 的 Spark SQL/Job,在将其迁移至 Spark 3 版本前,请参阅完整的官方迁移指南。

Spark 部署

官方文档 Running Spark on YARN - Documentation 中提到

To make Spark runtime jars accessible from YARN side, you can specify spark.yarn.archive or spark.yarn.jars. For details please refer to Spark Properties. If neither spark.yarn.archive nor spark.yarn.jars is specified, Spark will create a zip file with all jars under $SPARK_HOME/jars and upload it to the distributed cache.

因此,无需在 CDH 所有节点上部署 Spark 3,只需在 Hadoop Client 节点上部署 Spark 3 即可。

如果你对集群权限管理没有十分严格的要求,请使用 hive 用户以避免权限问题。

我基于 Spark 3.1.2 制作了一个适配 CDH 6 的版本, GitHub 传送门 ,下载解压至 /opt,并软链至 /opt/spark3

[hive@cdh-kyuubi]$ ls -l /opt | grep spark
lrwxrwxrwx  1 root         root           39 Aug 10 18:46 spark3 -> /opt/spark-3.1.2-cdh6-bin-3.2.2
drwxr-xr-x 13 hive         hive         4096 Aug 10 18:46 spark-3.1.2-cdh6-bin-3.2.2

配置 Hadoop、Hive

CDH 会将配置文件自动分发到所有节点 /etc 目录下,建立软链即可。

ln -s /etc/hadoop/conf/core-site.xml /opt/spark3/conf/
ln -s /etc/hadoop/conf/hdfs-site.xml /opt/spark3/conf/
ln -s /etc/hadoop/conf/yarn-site.xml /opt/spark3/conf/
ln -s /etc/hive/conf/hive-site.xml /opt/spark3/conf/

配置 Spark 环境变量 /opt/spark3/conf/spark-env.sh

#!/usr/bin/env bash

export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf
export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn:/etc/hive/conf

配置 Spark 默认参数 /opt/spark3/conf/spark-defaults.conf

请参考 Configuration - Spark Documentation 根据集群环境实际情况进行微调

spark.authenticate=false
spark.io.encryption.enabled=false
spark.network.crypto.enabled=false

spark.eventLog.enabled=true
spark.eventLog.dir=hdfs://nameservice1/user/spark/applicationHistory
spark.driver.log.dfsDir=/user/spark/driverLogs
spark.driver.log.persistToDfs.enabled=true

spark.files.overwrite=true
spark.files.useFetchCache=false

spark.serializer=org.apache.spark.serializer.KryoSerializer

spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
spark.shuffle.useOldFetchProtocol=true

spark.ui.enabled=true
spark.ui.killEnabled=true
spark.yarn.historyServer.address=http://cdh-master2:18088
spark.yarn.historyServer.allowTracking=true

spark.master=yarn
spark.submit.deployMode=cluster

spark.driver.memory=2G
spark.executor.cores=6
spark.executor.memory=8G
spark.executor.memoryOverhead=2G
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=2G

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.schedulerBacklogTimeout=1

spark.sql.cbo.enabled=true
spark.sql.cbo.starSchemaDetection=true
spark.sql.datetime.java8API.enabled=false
spark.sql.sources.partitionOverwriteMode=dynamic

spark.sql.hive.convertMetastoreParquet=false
spark.sql.hive.convertMetastoreParquet.mergeSchema=false
spark.sql.hive.metastore.version=2.1.1
spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/*

spark.sql.orc.mergeSchema=true
spark.sql.parquet.mergeSchema=true
spark.sql.parquet.writeLegacyFormat=true

spark.sql.adaptive.enabled=true
spark.sql.adaptive.forceApply=false
spark.sql.adaptive.logLevel=info
spark.sql.adaptive.advisoryPartitionSizeInBytes=256m
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.minPartitionNum=1
spark.sql.adaptive.coalescePartitions.initialPartitionNum=1024
spark.sql.adaptive.fetchShuffleBlocksInBatch=true
spark.sql.adaptive.localShuffleReader.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=128m
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2
spark.sql.autoBroadcastJoinThreshold=-1

验证 spark-shell 工作正常

[hive@cdh-kyuubi]$ /opt/spark3/bin/spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/08/30 19:53:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/08/30 19:53:46 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Spark context Web UI available at http://cdh-master1:4040
Spark context available as 'sc' (master = yarn, app id = application_1615462037335_40099).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2-cdh6
      /_/

Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql("show databases").show
+----------------+
|       namespace|
+----------------+
|            test|
+----------------+
scala>

至此,Spark 3 on CDH 6 已经部署完成,可以像 CDH 自带的 Spark 2.4 一样,正常的使用 spark-submit、spark-shell,但依旧不支持 spark-sql,spark-thriftserver。

相比略显鸡肋的 spark-sql,spark-thriftserver,Kyuubi 是更好的选择,因此我故意移除了这两个功能。

注意:我提供的构建版,没有启用 spark-thriftserver 模块,为正常使用 Kyuubi,请下载 hive-service-rpc-3.1.2.jar 添加到 /opt/spark3/jars 路径。

Kyuubi —— 解锁 Spark SQL 更多场景

简言之,Apache Kyuubi (Incubating) 之于 Spark,类似 HiveServer2 之于 Hive。

Kyuubi 通过将 Spark 暴露一个与 HiveServer2 完全兼容的 Thrift API,可以兼容现有的 Hive 生态,如 beeline,hive-jdbc-driver,HUE,Superset 等。

可以通过以下文档来了解 Kyuubi 的架构,Kyuubi 与 HiveServer2 和 Spark Thrift Server 的异同。

Kyuubi 部署

Kyuubi 无需任何修改即可适配 CDH 6,下面给出关键步骤,详情可以参考 Deploy Kyuubi engines on Yarn — Kyuubi documentation

同样的,如果你对集群权限管理没有十分严格的要求,请使用 hive 用户以避免权限问题。

解压部署

下载 kyuubi-1.3.0-incubating-bin.tgz 解压至 /opt,并创建软链到 /opt/kyuubi

[hive@cdh-external opt]$ ls -l /opt | grep kyuubi
lrwxrwxrwx  1 root         root           32 Aug 18 17:36 kyuubi -> /opt/kyuubi-1.3.0-incubating-bin
drwxrwxr-x 13 hive         hive         4096 Aug 18 17:52 kyuubi-1.3.0-incubating-bin

修改配置

按需修改 /opt/kyuubi/conf/kyuubi-env.sh

#!/usr/bin/env bash

export JAVA_HOME=/usr/java/default
export SPARK_HOME=/opt/spark3
export SPARK_CONF_DIR=${SPARK_HOME}/conf
export HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf

export KYUUBI_PID_DIR=/data/log/service/kyuubi/pid
export KYUUBI_LOG_DIR=/data/log/service/kyuubi/logs
export KYUUBI_WORK_DIR_ROOT=/data/log/service/kyuubi/work
export KYUUBI_MAX_LOG_FILES=10

参考 Kyuubi Configurations System — Kyuubi documentation 按需修改 /opt/kyuubi/conf/kyuubi-defaults.conf

kyuubi.authentication=NONE

kyuubi.engine.share.level=USER

kyuubi.frontend.bind.host=0.0.0.0
kyuubi.frontend.bind.port=10009

kyuubi.ha.zookeeper.quorum=cdh-master1:2181,cdh-master2:2181,cdh-master3:2181
kyuubi.ha.zookeeper.namespace=kyuubi

kyuubi.session.engine.idle.timeout=PT10H

spark.master=yarn
spark.submit.deployMode=cluster

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=20
spark.dynamicAllocation.executorIdleTimeout=60

注意:kyuubi-defaults.conf 中的 spark 配置优先级高于 spark-defaults.conf

性能调优

How To Use Spark Dynamic Resource Allocation (DRA) in Kyuubi — Kyuubi documentation
How To Use Spark Adaptive Query Execution (AQE) in Kyuubi — Kyuubi documentation

启动

使用 /opt/kyuubi/bin/kyuubi 在前台或后台启动 Kyuubi Server。

[hive@cdh-kyuubi]$ /opt/kyuubi/bin/kyuubi --help
Usage: bin/kyuubi command
  commands:
    start        - Run a Kyuubi server as a daemon
    run          - Run a Kyuubi server in the foreground
    stop         - Stop the Kyuubi daemon
    status       - Show status of the Kyuubi daemon
    -h | --help  - Show this help message

Beeline 连接

使用默认参数连接 Kyuubi

beeline -u jdbc:hive2://cdh-kyuubi:10009 -n bigdata

使用自定义参数连接 Kyuubi

beeline -u "jdbc:hive2://cdh-master2:10009/;?spark.driver.memory=8G#spark.app.name=batch_001;kyuubi.engine.share.level=CONNECTION" -n batch

详细配置参考 Access Kyuubi with Hive JDBC and ODBC Drivers — Kyuubi documentation

HUE 连接

简单说,在 Cloudera Manager 中修改 HUE 配置项 Hue Service Advanced Configuration Snippet 如下,即可在 HUE 中开启 Spark SQL 引擎。

[desktop]
 app_blacklist=zookeeper,hbase,impala,search,sqoop,security
 use_new_editor=true
[[interpreters]]
[[[sparksql]]]
  name=Spark SQL
  interface=hiveserver2
[[[hive]]]
  name=Hive
  interface=hiveserver2
# other interpreters
  ...
[spark]
sql_server_host=kyuubi
sql_server_port=10009

详细配置参考 Getting Started with Kyuubi and Cloudera Hue — Kyuubi documentation

Kyuubi engine 共享级别与应用场景

Kyuubi Spark engine 即一个 Spark driver,通过控制 engine 的共享策略,可以在隔离性和资源利用率上取得平衡。Kyuubi 提供 3 种 engine 共享级别,分别为 CONNECTION,USER(默认),SERVER。

下面的讨论均假设使用 YARN Cluster 模式启动 Kyuubi Spark engine。

我们首先补充一些时间开销和 Spark 操作执行的信息。

Kyuubi Spark engine 在冷启动时,会由 Kyuubi Server 通过 spark-submit 命令向 YARN 提交一个 Spark App,即 engine,该过程从提交到 Spark driver 启动约需要 5-6s,然后 engine 将自己注册到 Zookeeper,Kyuubi Server 监听 Zookeeper 发现 engine 并建立连接,该过程约 1-2s,如此算来,在 YARN 资源空闲时,整个 engine 冷启动时间约 6-8s;Client 连接到存在的 engine,通常耗时 1s 内。

如果启动了 Spark 的 executor 动态伸缩特性,真正执行 SQL 任务时,如果资源有富余,会动态创建 executor,每个 executor 创建耗时约为 2-3s。

元数据、DDL 等操作,如获取 database 列表,CREATE TABLE 语句等,会在 driver 上执行;计算任务如 JOINCOUNT() 等,会由 driver 生成执行计划,在 executor 上执行。

在我们的生产环境中,大概有如下三种使用场景:

  1. 使用 HUE 进行 ad-hoc 查询
    该场景中,会有多个用户进行查询,一般会运行相对较大的查询任务,用户对连接创建时间以及元数据加载时间较为敏感,但对查询结果响应时间有一定的容忍性。
    在这种场景中,使用默认的 USER 共享级别,每个用户只使用一个 Spark engine,配合使用 Spark 的动态伸缩特性,动态的创建和销毁 executor,在保证用户之间隔离的基础上,降低启动时间和资源占用。建议的关键配置如下:
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT1H

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.maxExecutors=30
spark.dynamicAllocation.executorIdleTimeout=120
  1. 使用 Beeline 运行批任务 SQL
    该场景会使用统一的 batch 账号提交 SQL 任务,对响应时间敏感度低,但对稳定性要求非常要,并且应尽可能的最大化利用集群资源。
    推荐在该场景下将共享级别调整为 CONNECTION,这样每个 SQL 执行将会使用独立的 Spark driver,并且 SQL 执行完毕后 Spark driver 立即退出,保障离线任务互不干扰,并且资源及时释放。建议的关键配置如下:
kyuubi.engine.share.level=CONNECTION

spark.dynamicAllocation.enabled=true
# 根据任务具体资源消耗估算,从 workflow 整体上提升集群资源利用率
spark.dynamicAllocation.minExecutors=5
spark.dynamicAllocation.maxExecutors=30
  1. 使用 Superset 进行多数据源联邦查询
    该场景中,只有一个 service 账号,会定时同时刷新大量的图表,对连接创建时间、查询结果响应时间、并发度都有较高的要求。
    该场景中,driver 和 executor 的启动时间都是不容忽视的,因此在 ad-hoc 查询配置的基础上,应延长 driver、executor 的闲置等待时间,并且设定最小的 executor 保活数量,保证时刻有 executor 常驻,能快速响应较小的查询。建议的关键配置如下:
kyuubi.engine.share.level=USER
kyuubi.session.engine.idle.timeout=PT10H

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=6
spark.dynamicAllocation.maxExecutors=10
spark.dynamicAllocation.executorIdleTimeout=120

我们可以启动三个 Kyuubi Server(单机或集群)来分别应对如上的三种场景,但也可以仅启动一个 Kyuubi Server(单机或集群)来满足不同的场景。

一种建议的实践方式是 Kyuubi Server 配置使用默认的 USER 共享级别,这样客户端连接时,会使用默认配置;当 ETL 跑批场景时,可以通过 beeline 参数将本次连接共享级别调整为 CONNECTION;类似的,在 Superset 连接中配置独立参数。

结语

现在去跑一些 SQL,体验 Spark SQL 带来的性能飞跃吧!

上一篇下一篇

猜你喜欢

热点阅读