Spark学习之路Spark 应用Spark在简书

01 从spark-submit说起

2018-09-10  本文已影响13人  Yu咸的静安

使用spark-submit命令来提交Spark程序

spark-submit:

# 1-- 检查是否存在SPARK_HOME环境变量
# 如没有则条用当前命令下的find-spark-home脚本查找和加载相关的环境变量
if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# 2-- 调用 spark-class 脚本,并将所有传递给spark-submit脚本的参数传递到spark-class中
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

spark-class

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

. "${SPARK_HOME}"/bin/load-spark-env.sh

if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
  echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
  echo "You need to build Spark with the target \"package\" before running this program." 1>&2
  exit 1
else
  LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

#执行org.apache.spark.launcher.Main类的main方法,该类负责build spark submit command
#并将command使用标准输出流输出
build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
#输出命令的退出状态
  printf "%d\0" $?
}

set +o posix
#读取build_command函数的输出
CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <(build_command "$@")

COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}

# 判断状态是否正确
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi

if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi
#执行组装好的命令,将会调用org.apache.spark.deploy.SparkSubmit 这类
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

为缩短篇幅只保留了主要代码

org.apache.spark.launcher.Main生成spark submit command流程

初始化SparkSubmitCommandBuilder,调用其buildCommand方法,最后将command输出

public static void main(String[] argsArray) throws Exception {
    List<String> args = new ArrayList<>(Arrays.asList(argsArray));
    String className = args.remove(0);
    AbstractCommandBuilder builder;
    if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
      try {
        //实例 SparkSubmitCommandBuilder
        builder = new SparkSubmitCommandBuilder(args);
      } catch (IllegalArgumentException e) {......}
    } else {
      builder = new SparkClassCommandBuilder(className, args);
    }
    Map<String, String> env = new HashMap<>();
    //调用 buildCommand方法
    List<String> cmd = builder.buildCommand(env);
    //打印command,代码此处省略了
  }

SparkSubmitCommandBuilder的构造函数内部初始化私有内部类OptionParser,调用其parse方法解析参数列表。

SparkSubmitCommandBuilder(List<String> args) {
    List<String> submitArgs = args;
    if (args.size() > 0) {......}
      this.isExample = isExample;
      //初始化OptionParser 调用 parse方法
      OptionParser parser = new OptionParser();
      parser.parse(submitArgs);
      this.isAppResourceReq = parser.isAppResourceReq;
    }
  }

OptionParser继承了SparkSubmitOptionParser并没有重写其parse方法,因此最终将调用抽象类的parse方法。

protected final void parse(List<String> args) {
    Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

    int idx = 0;
    for (idx = 0; idx < args.size(); idx++) {
      String arg = args.get(idx);
      String value = null;
      //首先判断参数是否为 key=value这种格式。如果满足从正则分组中抽取key和value
      Matcher m = eqSeparatedOpt.matcher(arg);
      if (m.matches()) {
        arg = m.group(1);
        value = m.group(2);
      }

      /**
       * 类中将spark-submit的Option分为两种类型:
       * 1. 需要赋值的参数如--class  并将这些参数存在 opts二维数组中
       * 2. 无须赋值的参数如--version并将这些参数存在switches 二位数组中
       */
      //查找参数是否在opts中
      String name = findCliOption(arg, opts);
      if (name != null) {
        if (value == null) {
          if (idx == args.size() - 1) {
            throw new IllegalArgumentException(
                String.format("Missing argument for option '%s'.", arg));
          }
          //参数不是key=value的格式,从数组中取出值
          idx++;
          value = args.get(idx);
        }
        //处理在Options中的参数
        if (!handle(name, value)) {
          break;
        }
        continue;
      }

      // 查找参数是否在switches中,此类参数不需要赋值因此传入null
      name = findCliOption(arg, switches);
      if (name != null) {
        if (!handle(name, null)) {
          break;
        }
        continue;
      }
      
      //如果参数没有在opts和switches 两个数组中,使用handleUnknown
      if (!handleUnknown(arg)) {
        break;
      }
    }

    //循环结束idx < args.size()说明还有剩余的参数没有处理,使用handleExtraArgs方法处理
    if (idx < args.size()) {
      idx++;
    }
    handleExtraArgs(args.subList(idx, args.size()));
  }

OptionParser 类重写了父类的handle、handleUnknown、handleExtraArgs方法,具体代码比较简单请参考源代码。
接下来将调用buildCommand方法来构造command,SparkSubmitCommandBuilder重写了父类的buildCommand

 @Override
  public List<String> buildCommand(Map<String, String> env)
      throws IOException, IllegalArgumentException {
    if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) {
      return buildPySparkShellCommand(env);
    } else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) {
      return buildSparkRCommand(env);
    } else {
      return buildSparkSubmitCommand(env);
    }
  }

我们来看buildSparkSubmitCommand方法

private List<String> buildSparkSubmitCommand(Map<String, String> env)
      throws IOException, IllegalArgumentException {
    // Load the properties file and check whether spark-submit will be running the app's driver
    // or just launching a cluster app. When running the driver, the JVM's argument will be
    // modified to cover the driver's configuration.

    //加载properties文件,如果spark-submit脚本中没有配置--properties-file
    // 会加载${SPARK_CONF_DIR}/spark-defaults.conf文件
    Map<String, String> config = getEffectiveConfig();
    boolean isClientMode = isClientMode(config);

    //对于client的部署模式,确保driver启动之前所有的依赖包都加到了classPath中
    //TODO 对于cluster部署模式何时加载??
    String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;

    //构建java运行命令,包括寻找JAVA_HOME,加载java-opts,jar包和HADOOP_CONF_DIR、YARN_CONF_DIR
    // SPARK_DIST_CLASSPATH、SPARK_CONF_DIR等配置文件目录
    List<String> cmd = buildJavaCommand(extraClassPath);
    // Take Thrift Server as daemon
    if (isThriftServer(mainClass)) {
      addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
    }
    addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));

    // We don't want the client to specify Xmx. These have to be set by their corresponding
    // memory flag --driver-memory or configuration entry spark.driver.memory
    String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
    if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) {
      String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
                   "java options (was %s). Use the corresponding --driver-memory or " +
                   "spark.driver.memory configuration instead.", driverExtraJavaOptions);
      throw new IllegalArgumentException(msg);
    }

    if (isClientMode) {
      // Figuring out where the memory value come from is a little tricky due to precedence.
      // Precedence is observed in the following order:
      // - explicit configuration (setConf()), which also covers --driver-memory cli argument.
      // - properties file.
      // - SPARK_DRIVER_MEMORY env variable
      // - SPARK_MEM env variable
      // - default value (1g)
      // Take Thrift Server as daemon
      String tsMemory =
        isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
      String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
        System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
      cmd.add("-Xmx" + memory);
      addOptionString(cmd, driverExtraJavaOptions);
      mergeEnvPathList(env, getLibPathEnvName(),
        config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
    }
    //该类是真正用于提交spark程序的类
    cmd.add("org.apache.spark.deploy.SparkSubmit");
    //将我们使用spark-submit脚本中传递过来的参数
    cmd.addAll(buildSparkSubmitArgs());
    return cmd;
  }

对于getEffectiveConfigbuildJavaCommandbuildSparkSubmitArgs代码比较简单请参照源代码。
至此,org.apache.spark.launcher.Main这个类的任务就完成了。

上一篇 下一篇

猜你喜欢

热点阅读