01 从spark-submit说起
2018-09-10 本文已影响13人
Yu咸的静安
使用spark-submit命令来提交Spark程序
spark-submit:
# 1-- 检查是否存在SPARK_HOME环境变量
# 如没有则条用当前命令下的find-spark-home脚本查找和加载相关的环境变量
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# 2-- 调用 spark-class 脚本,并将所有传递给spark-submit脚本的参数传递到spark-class中
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
spark-class
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
. "${SPARK_HOME}"/bin/load-spark-env.sh
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
#执行org.apache.spark.launcher.Main类的main方法,该类负责build spark submit command
#并将command使用标准输出流输出
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
#输出命令的退出状态
printf "%d\0" $?
}
set +o posix
#读取build_command函数的输出
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <(build_command "$@")
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# 判断状态是否正确
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
fi
#执行组装好的命令,将会调用org.apache.spark.deploy.SparkSubmit 这类
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
为缩短篇幅只保留了主要代码
org.apache.spark.launcher.Main生成spark submit command流程
初始化SparkSubmitCommandBuilder,调用其buildCommand方法,最后将command输出
public static void main(String[] argsArray) throws Exception {
List<String> args = new ArrayList<>(Arrays.asList(argsArray));
String className = args.remove(0);
AbstractCommandBuilder builder;
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
try {
//实例 SparkSubmitCommandBuilder
builder = new SparkSubmitCommandBuilder(args);
} catch (IllegalArgumentException e) {......}
} else {
builder = new SparkClassCommandBuilder(className, args);
}
Map<String, String> env = new HashMap<>();
//调用 buildCommand方法
List<String> cmd = builder.buildCommand(env);
//打印command,代码此处省略了
}
SparkSubmitCommandBuilder的构造函数内部初始化私有内部类OptionParser,调用其parse方法解析参数列表。
SparkSubmitCommandBuilder(List<String> args) {
List<String> submitArgs = args;
if (args.size() > 0) {......}
this.isExample = isExample;
//初始化OptionParser 调用 parse方法
OptionParser parser = new OptionParser();
parser.parse(submitArgs);
this.isAppResourceReq = parser.isAppResourceReq;
}
}
OptionParser继承了SparkSubmitOptionParser并没有重写其parse方法,因此最终将调用抽象类的parse方法。
protected final void parse(List<String> args) {
Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
int idx = 0;
for (idx = 0; idx < args.size(); idx++) {
String arg = args.get(idx);
String value = null;
//首先判断参数是否为 key=value这种格式。如果满足从正则分组中抽取key和value
Matcher m = eqSeparatedOpt.matcher(arg);
if (m.matches()) {
arg = m.group(1);
value = m.group(2);
}
/**
* 类中将spark-submit的Option分为两种类型:
* 1. 需要赋值的参数如--class 并将这些参数存在 opts二维数组中
* 2. 无须赋值的参数如--version并将这些参数存在switches 二位数组中
*/
//查找参数是否在opts中
String name = findCliOption(arg, opts);
if (name != null) {
if (value == null) {
if (idx == args.size() - 1) {
throw new IllegalArgumentException(
String.format("Missing argument for option '%s'.", arg));
}
//参数不是key=value的格式,从数组中取出值
idx++;
value = args.get(idx);
}
//处理在Options中的参数
if (!handle(name, value)) {
break;
}
continue;
}
// 查找参数是否在switches中,此类参数不需要赋值因此传入null
name = findCliOption(arg, switches);
if (name != null) {
if (!handle(name, null)) {
break;
}
continue;
}
//如果参数没有在opts和switches 两个数组中,使用handleUnknown
if (!handleUnknown(arg)) {
break;
}
}
//循环结束idx < args.size()说明还有剩余的参数没有处理,使用handleExtraArgs方法处理
if (idx < args.size()) {
idx++;
}
handleExtraArgs(args.subList(idx, args.size()));
}
OptionParser 类重写了父类的handle、handleUnknown、handleExtraArgs方法,具体代码比较简单请参考源代码。
接下来将调用buildCommand方法来构造command,SparkSubmitCommandBuilder重写了父类的buildCommand。
@Override
public List<String> buildCommand(Map<String, String> env)
throws IOException, IllegalArgumentException {
if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) {
return buildPySparkShellCommand(env);
} else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) {
return buildSparkRCommand(env);
} else {
return buildSparkSubmitCommand(env);
}
}
我们来看buildSparkSubmitCommand方法
private List<String> buildSparkSubmitCommand(Map<String, String> env)
throws IOException, IllegalArgumentException {
// Load the properties file and check whether spark-submit will be running the app's driver
// or just launching a cluster app. When running the driver, the JVM's argument will be
// modified to cover the driver's configuration.
//加载properties文件,如果spark-submit脚本中没有配置--properties-file
// 会加载${SPARK_CONF_DIR}/spark-defaults.conf文件
Map<String, String> config = getEffectiveConfig();
boolean isClientMode = isClientMode(config);
//对于client的部署模式,确保driver启动之前所有的依赖包都加到了classPath中
//TODO 对于cluster部署模式何时加载??
String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;
//构建java运行命令,包括寻找JAVA_HOME,加载java-opts,jar包和HADOOP_CONF_DIR、YARN_CONF_DIR
// SPARK_DIST_CLASSPATH、SPARK_CONF_DIR等配置文件目录
List<String> cmd = buildJavaCommand(extraClassPath);
// Take Thrift Server as daemon
if (isThriftServer(mainClass)) {
addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
}
addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
// We don't want the client to specify Xmx. These have to be set by their corresponding
// memory flag --driver-memory or configuration entry spark.driver.memory
String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) {
String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
"java options (was %s). Use the corresponding --driver-memory or " +
"spark.driver.memory configuration instead.", driverExtraJavaOptions);
throw new IllegalArgumentException(msg);
}
if (isClientMode) {
// Figuring out where the memory value come from is a little tricky due to precedence.
// Precedence is observed in the following order:
// - explicit configuration (setConf()), which also covers --driver-memory cli argument.
// - properties file.
// - SPARK_DRIVER_MEMORY env variable
// - SPARK_MEM env variable
// - default value (1g)
// Take Thrift Server as daemon
String tsMemory =
isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
cmd.add("-Xmx" + memory);
addOptionString(cmd, driverExtraJavaOptions);
mergeEnvPathList(env, getLibPathEnvName(),
config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
}
//该类是真正用于提交spark程序的类
cmd.add("org.apache.spark.deploy.SparkSubmit");
//将我们使用spark-submit脚本中传递过来的参数
cmd.addAll(buildSparkSubmitArgs());
return cmd;
}
对于getEffectiveConfig、buildJavaCommand和buildSparkSubmitArgs代码比较简单请参照源代码。
至此,org.apache.spark.launcher.Main这个类的任务就完成了。