Spark on YARN
启动SparkOnYARN
确认HADOOP_CONF_DIR或者YARN_CONF_DIR指向的目录包含Hadoop集群的配置文件。Spark依赖这些配置文件的内容写HDFS或者向YARN申请资源。这些配置需要复制到Spark集群内所有的机器上,来保证整个集群使用的是同一套配置。如果配置中的Java系统变量或者环境变量不是YARN支持的,他们应该被设置到Spark应用的配置中(driver、executor和Application Master)
使用YARN启动Spark应用有两种模式(cluster和client)。在cluster模式下,Spark应用的Driver端根据YARN的调度运行在Application Master进程中,在启动Spark应用之后,客户端就可以直接退出了。在client模式下,Driver端运行在客户端进程里,Application Master只用来向YARN申请资源。
和standalone以及mesos模式不一样,在YARN模式下,ResourceManager的地址是从Hadoop配置文件中读到的,所以 --master 参数要设置值为 "yarn",以集群模式启动Spark应用需要这样写
./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
例如
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
lib/spark-examples*.jar \
10
上面这行命令启动了一个YARN的client程序,这个程序会启动一个默认的Application Master。 SparkPi这个类会作为Application Master进程的一个线程执行。客户端可以定期的从Application Master拉取一些统计信息,然后把这些信息显示在控制台上。当Spark应用执行完毕的时候这个client会自动退出。
以client模式启动一个Spark应用和前面差不多,只是把deploy-mode参数的cluster替换成client,下面这个行命令是使用client模式启动spark-shell
./bin/spark-shell --master yarn --deploy-mode client
添加其他JAR
cluster模式下,因为Driver端运行在其他机器上,所以代码里写SparkContext.addJar是没用的(client模式才可以这样写)。如果cluster模式下我们还有这个需求,我们要把想添加的jar文件用--jars参数拼在启动命令里。
./bin/spark-submit --class my.main.Class \
--master yarn \
--deploy-mode cluster \
--jars my-other-jar.jar,my-other-other-jar.jar \
my-main-jar.jar \
app_arg1 app_arg2
准备
想要使用Spark on YARN需要有Spark独立的构建结果。官网上提供了下载地址,如果想要自己去构建一把也可以,相关信息在这里
配置
Spark on YARN模式的配置项大多数和另外两种一样。关于on YARN模式的特殊配置可以看这里
调试Spark应用
在YARN的属于里,executor和application master都运行在YARN的容器内。在Spark应用执行完毕后YARN有两种方式来管理容器内的日志。如果日志聚集功能打开(使用 yarn.log-aggregation-enable 配置项),容器日志会被复制到HDFS,并从所在的机器上删除。这些日志可以使用yarn logs命令在集群内的任意一台节点上查看
yarn logs -applicationId <app ID>
这条命令会打印这个应用(其实是Spark应用,但YARN是一个独立的资源调度与任务监控工具,只要符合它的规范,任何应用都可以使用YARN来做资源调度和任务监控,所以这里用了"应用"两个字而不是"Spark应用")在所有容器实例内的所有日志。我们也可以使用HDFS的API来直接查看日志。这些日志的存储路径使用yarn.nodemanager.remote-app-log-dir 和 yarn.nodemanager.remote-app-log-dir-suffix来配置,一个配置目录,一个配置文件后缀。这些日志还可以在Spark的WEB UI中的Executors 页签中查看。我们需要确保Spark History Server和MapReduce History Server都运行正常,并且早yarn-site.xml中配置了yarn.log.server.url属性。Spark History Server的UI会把查看日志的动作重定向到MapReduce History Server中来显示聚合后的日志。
日志聚合没打开时,日志会保存在每台执行的机器上,所在的目录是YARN_APP_LOGS_DIR,这个一般被配置到/tmp/logs目录或者$HADOOP_HOME/log/uiserlogs目录(具体位置取决于Hadoop版本和安装时的配置)。我们可以到具体的机器上去查看指定目录下的日志,子目录被按照applicationid和containerid组织。日志同样可以在Spark WEB UI中的Executors页签中查看,但此时不会重定向到MapReduce History Server了。
想查看每个容器启动时的环境变量,可以增加 yarn.nodemanager.delete.debug-delay-sec属性的值(比如设置为36000)然后到运行容器的机器上查看 yarn.nodemanager.local-dirs 目录中的内容。这个目录中包含了启动容器的脚本,jar包和所有的环境变量。这个骚操作在调试classpath和环境变量等问题时非常有用。(注意,启用此选项需要管理员权限才能进行群集设置并重新启动所有节点管理器。因此,这不适用于托管群集)。
要为application master和 executor使用自定义的log4j配置,可以使用下面的选项
- 用spark-submit提交任务时,使用 --files 来上传一个自定义的log4j.properties
- 配置Spark集群的时候,把 spark.driver.extraJavaOptions(这个是给driver的)或者spark.executor.extraJavaOptions(这个是给executor的)配置上 -Dlog4j.configuration=<log4j.properties文件的绝对路径>。注意,这样做需要配置的时候把 file:// 这个协议写上,这个文件也需要放到集群中所有节点上
- 修改spark机器上配置文件目录内的log4j.properties(看样子是不需要重启了),但要注意这个配置比前面两个优先级要低,互相冲突的时候这个文件内的配置肯定不生效
需要注意的是:第一种方式的配置会让Application Master和Executor使用相同的配置文件,当ApplicationMaster和某个Executor在同一台机器上的时候,可能会有一些问题,因为他们在同时操作同一个日志文件。
如果我们需要设置一个适当的位置来让YARN放置、聚合日志,在log4j.properties文件中使用spark.yarn.app.container.log.dir 来配置文件输出位置,比如:
log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log
如果是Streaming应用,需要配置 RollingFileAppender并且设置本地存储路径,来避免日志文件太大磁盘空间不足。
为Application Master和executor指定自定义的metrics.properties ,这个文件在spark配置目录中。对文件的修改会自动被提交给集群,所以我们不需要每次启动Spark应用手动指定 --files 参数。
下面是Spark的属性
名称 | 默认值 | 含义 |
---|---|---|
spark.yarn.am.memory | 512m | client模式下指定Application Master总的内存大小,cluster模式下使用spark.driver.memory。使用小写字母后缀来指定单位:k,m,g,t |
spark.yarn.am.cores | 1 | client模式下Application Master使用的cpu数量。cluster模式下用spark.driver.cores |
spark.yarn.am.waitTime | 100s | cluster模式下application master等待SparkContext被初始化的时间。client模式下application master等待driver连接的时间 |
spark.yarn.submit.file.replication | HDFS的副本数 | Spark应用提交的文件在HDFS中的副本数,一般这些文件是:程序的jar和其他的分布式存储的文件或归档文件 |
spark.yarn.stagingDir | 当前用户家目录 | 提交应用程序时使用的暂存目录。 |
spark.yarn.preserve.staging.files | false | 当Spark应用结束的时候,缓存的jar和文件是否删除。false为删除,设置为true是不删除 |
spark.yarn.scheduler.heartbeat.interval-ms | 3000 | application master到YARN resourcemanager的心跳间隔,单位是毫秒。这个值的上线时YARN配置的值的一半,也就是 yarn.am.liveness-monitor.expiry-interval-ms的值的一半 |
spark.yarn.scheduler.initial-allocation.interval | 200ms | 当ResourceManager没响应ApplicationMaster请求的时候,重复请求的间隔。这个值不应该比spark.yarn.scheduler.heartbeat.interval-ms大。如果ResourceManager一直没响应,ApplicationMaster会再次请求,直到得到反馈,或者总时间超过spark.yarn.scheduler.heartbeat.interval-ms值 |
spark.yarn.historyServer.address | 无 | Spark History Server地址。 |
spark.yarn.dist.archives | 无 | 会被复制到每个Executor所在机器上的归档 |
文件,逗号分隔 | ||
spark.yarn.dist.files | 无 | 会被复制到每个Executor所在机器上的文件,逗号分隔 |
spark.yarn.dist.jars | 无 | 会被复制到每个Executor所在机器上的jar文件,逗号分隔 |
spark.executor.instances | 2 | 静态资源exector个数分配默认值。(Spark是静态资源分配,因为我们需要在执行前确认资源,不论命令行还是默认配置都是预先分配)当spark.dynamicAllocation.enabled为true时,是动态分配资源,这种场景Streaming的情况更多,因为需要的资源和业务峰值相关。 |
spark.yarn.executor.memoryOverhead | executorMemory * 0.10,最少 384 | 每个executor分配的最少非堆内存,单位m。 |
spark.yarn.driver.memoryOverhead | driverMemory * 0.10, 最少384 | Driver端的非堆内存,单位m |
spark.yarn.am.memoryOverhead | AM memory * 0.10, 最少384 | client模式下Application Master的非堆内存 |
spark.yarn.am.port | 随机 | Application Master的端口。client模式下,这个端口用于Driver端于ApplicationMaster的通信。cluster模式下用于接收调度程序的终止命令 |
spark.yarn.queue | default | 提交应用程序的YARN队列名称 |
spark.yarn.jars | 无 | 需要分发到YARN容器中的Spark代码的jar包,多个用逗号分隔。Spark on YARN中Spark默认使用本地的Spark的jar,但是也可以把Spark的jar放到HDFS中全局可读,这样Spark应用程序使用这些jar的时候不需要每次都用 --jars 提交然后分发。 |
spark.yarn.archive | 无 | 和spark.yarn.jars同样原理 |
spark.yarn.access.namenodes | 无 | Spark应用程序可以访问的安全的HDFS的namenode节点列表,多个用逗号分隔spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032, webhdfs://nn3.com:50070。关于权限和K8S相关信息我还不了解,以后再回来修改 |
spark.yarn.appMasterEnv.[EnvironmentVariableName] | 无 | 把环境变量添加到Application Master所在的进程中。在cluster模式下,这个环境变量可以被Driver端读取到, client模式下这个变量只有启动Driver的程序可以读取到 |
spark.yarn.containerLauncherMaxThreads | 25 | Application Master中用于启动执行程序容器的最大线程数 |
spark.yarn.am.extraJavaOptions | 无 | Application Master启动的时候的JVM参数配置 |
spark.yarn.am.extraLibraryPath | 无 | 在client模式下,启动Application Master的时候指定的jar加载路径 |
spark.yarn.maxAppAttempts | yarn-site.xml中配置的yarn.resourcemanager.am.max-attempts 值 | 向YARN提交申请的时候最大的尝试次数,这个数字不应该超过YARN中配置的值 |
spark.yarn.am.attemptFailuresValidityInterval | 无 | Spark监控ApplicationMaster的失败次数,如果这个值配置了,当ApplicationMaster运行持续时间超过这个值的时候,失败次数会被清零。这个属性不配置的时候不会生效,并且要求Hadoop版本在2.6以上 |
spark.yarn.executor.failuresValidityInterval | 无 | 定义了Executor执行是否失败的检查间隔,超过这个检查间隔的错误会被忽略 |
spark.yarn.submit.waitAppCompletion | true | yarn cluster模式下,这个属性控制客户端是否需要同步等待任务执行完毕。如果设置为true,客户端进程会一直活着,并报告Spark应用的当前状态。如果设置为false,则客户端提交完毕后就立即退出 |
spark.yarn.am.nodeLabelExpression | 无 | 声明ApplicationMaster的节点标签表达式,目的是限制哪些节点可以作为ApplicationMaster节点,但要求YARN版本在2.6以上 |
spark.yarn.executor.nodeLabelExpression | 无 | 声明Executor的节点标签表达式,目的是限制哪些节点可以作为Executor节点,但要求YARN版本在2.6以上 |
to be continued
重要提示
- 请求是否被及时响应依赖于当前使用的scheduler,以及这个scheduler的配置方式
- 在cluster模式下,driver和executor使用的本地工作目录是YARN中配置的yarn.nodemanager.local-dirs目录下,用户指定的spark.local.dir是不起作用的。在client模式下,只有executor和cluster模式下一样,而Driver会使用spark.local.dir中的配置作为本地工作目录。这是因为client模式下Driver不是在YARN cluster里执行的。
- 使用--files参数提交文件到集群可以指定别名,例如 --files localtest.txt#appSees.txt 这是会把localtest.txt提交到HDFS里,但是想要访问这个文件需要使用appSees.txt这个名字。
- cluster模式下,可以用--jars来实现SparkContext.addJar的功能。
在安全集群中运行
TODO
给YARN配置外部Shuffle服务
这部分因为我不了解YARN,所以不是很明白。以后了解了再回来补充
使用Spark Shuffle Service,需要在YARN集群里做如下操作
- 使用YARN profile构建Spark。如果现在在使用预编译的发布版则跳过此步
- 找到spark-<version>-yarn-shuffle.jar。如果是自己构建这个包应该在 $SPARK_HOME/common/network-yarn/target/scala-<version>目录下。
如果使用的是发布版,这个包应该在yarn目录下 - 在集群的每个NodeManager上把这个包添加到环境变量里
- 集群的每个NodeManager上的yarn-site.xml中,把spark_shuffle设置为 yarn.nodemanager.aux-services的值。然后设置org.apache.spark.network.yarn.YarnShuffleService替代yarn.nodemanager.aux-services.spark_shuffle.class
- 给NodeManager增加堆内存,在etc/hadoop/yarn-env.sh中设置YARN_HEAPSIZE。这个设置避免Shuffle过程中的内存回收
- 重启YARN集群中的所有NodeManager
当Spark Shuffle Service在YARN集群跑起来后,可以使用这个配置项
spark.yarn.shuffle.stopOnFailure ,默认是false。当SparkShuffleService初始化失败的时候是否关闭NodeManager。这个配置是避免当机器上的SparkShuffleService不可用的时候,任务被分配到了这台机器上。