Spark-core源码精读玩转大数据技术干货

Spark-Core源码精读(3)、spark-shell(sp

2017-03-02  本文已影响1101人  sun4lower

本文将解读使用spark-shell的方式进入REPL的具体流程。

注:本专题的文章皆使用Spark-1.6.3版本的源码为参考,如果Spark-2.1.0版本有重大改进的地方也会进行说明。

shell部分

下面我们来看一下当我们输入 spark-shell --master spark://master:7077时具体的执行流程,首先当然是看一下spark-shell.sh的源码,我们只选取了相对比较重要的部分:

##检测有没有设置SPARK_HOME环境变量,如果没有进行设置
if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
##...
function main() {
  if $cygwin; then
    # Workaround for issue involving JLine and Cygwin
    # (see http://sourceforge.net/p/jline/bugs/40/).
    # If you're using the Mintty terminal emulator in Cygwin, may need to set the
    # "Backspace sends ^H" setting in "Keys" section of the Mintty options
    # (see https://github.com/sbt/sbt/issues/562).
    stty -icanon min 1 -echo > /dev/null 2>&1
    export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
    "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
    stty icanon echo > /dev/null 2>&1
  else
    export SPARK_SUBMIT_OPTS
    "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
  fi
}
##...
main "$@"
##...

可以看出最后执行的是main方法并传入我们使用spark-shell命令时候的所有参数,比如--master,而main方法中无论是什么操作系统(当然生产环境是linux系统)都会最终执行spark-submit,并且class为org.apache.spark.repl.Main、name为“Spark shell”并且将spark-shell所有接收到的用户输入的参数一起传进去,下面我们来看spark-submit:

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

spark-submit的代码比较简洁,最后使用exec通过spark-class来启动SparkSubmit并将spark-submit接收到的所有参数传入,下面我们来看一下spark-class:(这里要说明一下,从这里开始起始就是我们通过spark-submit提交application的过程,只不过spark-submit提交的时候后面跟的是用户自己编写的类,而通过spark-shell过来的spark-submit后面跟的是org.apache.spark.repl.Main,spark-submit方式提交的application运行完成后就会结束,而通过spark-shell进入的REPL一直等待用户的输入)

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

## 载入环境变量
. "${SPARK_HOME}"/bin/load-spark-env.sh

## 获得java的二进制文件,后面会用来启动一个JVM进行
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ `command -v java` ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

## jar包的相关依赖
# Find assembly jar
SPARK_ASSEMBLY_JAR=
if [ -f "${SPARK_HOME}/RELEASE" ]; then
  ASSEMBLY_DIR="${SPARK_HOME}/lib"
else
  ASSEMBLY_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION"
fi

GREP_OPTIONS=
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" -a "$SPARK_PREPEND_CLASSES" != "1" ]; then
  echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
  echo "You need to build Spark before running this program." 1>&2
  exit 1
fi
if [ -d "$ASSEMBLY_DIR" ]; then
  ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
  if [ "$num_jars" -gt "1" ]; then
    echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
    echo "$ASSEMBLY_JARS" 1>&2
    echo "Please remove all but one jar." 1>&2
    exit 1
  fi
fi

SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"

LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
  ## 使用java -cp命令启动一个JVM进程并执行org.apache.spark.launcher.Main类的main方法,后面我们会看到这个进程就是SparkSubmit进程
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"

spark-class是Spark应用程序的命令行启动器,负责设置JVM环境并执行Spark的应用程序,这里我们执行的就是SparkSubmit,下面我们就进入到Spark源码的部分。

Spark源码部分

承接上文,我们直接进入Spark的源码:

关于org.apache.spark.launcher.Main的源码我们这里不做说明,大家可以参考我的另一篇文章(对Main类的补充说明),简单来说它会根据传入的类进行判断然后生成相应的command,最后交给exec来执行,我们现在主要关注Spark本身,所以直接进入SparkSubmit的源码部分:

def main(args: Array[String]): Unit = {
  /** 使用SparkSubmitArguments封装spark-submit传入的参数,还记得都有什么吗?
   *  如果是spark-shell,就包括spark-shell及后面的一串参数,如果是直接使用spark-submit进行提交
   *  后面就是提交时传入的参数,由于SparkSubmitArguments中的参数比较多,本文中不再一一列出
   *  会在使用到某个参数的时候进行说明,详细的参数可以参看SparkSubmitArguments的源码。
  */
  val appArgs = new SparkSubmitArguments(args)
  // 如果开启了debug模式就打印出参数
  if (appArgs.verbose) {
    // scalastyle:off println
    printStream.println(appArgs)
    // scalastyle:on println
  }
  
  /** 这里的action就是spark-submit执行的动作,包括:SUBMIT, KILL, REQUEST_STATUS(使
   *  用了SparkSubmitAction进行了封装),如果没有指定,默认就是SparkSubmitAction.SUBMIT,
   *  所以下面的这个模式匹配将执行submit(appArgs)
  */
  appArgs.action match {
    case SparkSubmitAction.SUBMIT => submit(appArgs)
    case SparkSubmitAction.KILL => kill(appArgs)
    case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
  }
}

下面我们来看submit(appArgs)方法:

/**
 * submit方法的主要功能就是使用传进来的参数来提交应用程序。
 * 主要分为两步骤:
 * 1. 准备启动所需的环境,包括设置classpath、系统参数和应用程序的参数(根据部署模式和cluster
 * manager运行child main类)。
 * 2. 使用上一步准备好的环境调用child main class中的main函数,我们这里只考虑client模式,
 * cluster模式我们以后会单独分析。
 * 所以如果是spark-shell,child main class就是org.apache.spark.repl.Main,如果是
 * spark-submit直接进行提交,child main class就是用户编写的应用程序(含有main方法的类)
*/
private def submit(args: SparkSubmitArguments): Unit = {
  // 准备环境,主要就是获得childMainClass,即我们上面所说的child main class
  val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
   // 注意:源码中这里是doRunMain()方法,我们在后面单独拿出来进行分析
   // 判断gateway使用的是Akka还是基于REST的,但是不论那种方式最后都会调用doRunMain()方法
   // In standalone cluster mode, there are two submission gateways:
   //   (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
   //   (2) The new REST-based gateway introduced in Spark 1.3
   // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
   // to use the legacy gateway if the master endpoint turns out to be not a REST server.
  if (args.isStandaloneCluster && args.useRest) {
    try {
      // scalastyle:off println
      printStream.println("Running Spark using the REST application submission protocol.")
      // scalastyle:on println
      doRunMain()
    } catch {
      // Fail over to use the legacy submission gateway
      case e: SubmitRestConnectionException =>
        printWarning(s"Master endpoint ${args.master} was not a REST server. " +
          "Falling back to legacy submission gateway instead.")
        args.useRest = false
        submit(args)
    }
  // In all other modes, just run the main class as prepared
  } else {
    doRunMain()
  }
}

doRunMain()的实现部分:

def doRunMain(): Unit = {
  if (args.proxyUser != null) {
    // 这里是hadoop相关的用户和组的信息
    val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
      UserGroupInformation.getCurrentUser())
    try {
      proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
        override def run(): Unit = {
          runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
        }
      })
    } catch {
      case e: Exception =>
        // Hadoop's AuthorizationException suppresses the exception's stack trace, which
        // makes the message printed to the output by the JVM not very helpful. Instead,
        // detect exceptions with empty stack traces here, and treat them differently.
        if (e.getStackTrace().length == 0) {
          // scalastyle:off println
          printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
          // scalastyle:on println
          exitFn(1)
        } else {
          throw e
        }
    }
  } else {
    runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
  }
}

我们看到doRunMain()内部最终都执行了runMain方法,所以我们进入runMain方法:

/** 别看这个方法这么长,主要做的事情就是一件:运行child main class的main方法
再次说明一下,如果是直接使用spark-submit提交的应用程序,就是执行用户指定的类的main方法
如果是通过spark-shell执行的,就是执行org.apache.spark.repl.Main中的main方法
*/
private def runMain(
    childArgs: Seq[String],
    childClasspath: Seq[String],
    sysProps: Map[String, String],
    childMainClass: String,
    verbose: Boolean): Unit = {
  //是否打印debug信息
  // scalastyle:off println
  if (verbose) {
    printStream.println(s"Main class:\n$childMainClass")
    printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
    printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
    printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
    printStream.println("\n")
  }
  // scalastyle:on println
  
  // 下面这些操作是指定当前运行线程的ClassLoader
  val loader =
    if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
      new ChildFirstURLClassLoader(new Array[URL](0),
        Thread.currentThread.getContextClassLoader)
    } else {
      new MutableURLClassLoader(new Array[URL](0),
        Thread.currentThread.getContextClassLoader)
    }
  Thread.currentThread.setContextClassLoader(loader)
  
  // 添加jar依赖
  for (jar <- childClasspath) {
    addJarToClasspath(jar, loader)
  }
  // 系统属性
  for ((key, value) <- sysProps) {
    System.setProperty(key, value)
  }
  var mainClass: Class[_] = null
  // 通过反射的方式获得mainClass(child main class)
  try {
    mainClass = Utils.classForName(childMainClass)
  } catch {
    case e: ClassNotFoundException =>
      e.printStackTrace(printStream)
      if (childMainClass.contains("thriftserver")) {
        // scalastyle:off println
        printStream.println(s"Failed to load main class $childMainClass.")
        printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
        // scalastyle:on println
      }
      System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
    case e: NoClassDefFoundError =>
      e.printStackTrace(printStream)
      if (e.getMessage.contains("org/apache/hadoop/hive")) {
        // scalastyle:off println
        printStream.println(s"Failed to load hive class.")
        printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
        // scalastyle:on println
      }
      System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
  }
  // SPARK-4170
  if (classOf[scala.App].isAssignableFrom(mainClass)) {
    printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
  }
  // 获得mainClass(child main class)的main方法
  val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
  // main方法必须是static级别的
  if (!Modifier.isStatic(mainMethod.getModifiers)) {
    throw new IllegalStateException("The main method in the given main class must be static")
  }
  def findCause(t: Throwable): Throwable = t match {
    case e: UndeclaredThrowableException =>
      if (e.getCause() != null) findCause(e.getCause()) else e
    case e: InvocationTargetException =>
      if (e.getCause() != null) findCause(e.getCause()) else e
    case e: Throwable =>
      e
  }
  // 最后调用main方法
  try {
    mainMethod.invoke(null, childArgs.toArray)
  } catch {
    case t: Throwable =>
      findCause(t) match {
        case SparkUserAppException(exitCode) =>
          System.exit(exitCode)
        case t: Throwable =>
          throw t
      }
  }
}

走到这里,如果是用户通过spark-submit提交自己编写的spark application,那么就直接调用main方法,然后一步一步执行用户编写的代码:SparkContext等等,我们会在以后的文章中进行分析,所以我们现在要跟随的就是org.apache.spark.repl.Main中的main方法(注意本文中我们只讨论client的模式,至于cluster的模式我们会单独进行分析),这里我们贴出SparkSubmit进程中主线程的thread dump:

java.io.FileInputStream.read0(Native Method)
java.io.FileInputStream.read(FileInputStream.java:207)
scala.tools.jline.TerminalSupport.readCharacter(TerminalSupport.java:152)
scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:125)
scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.java:933)
scala.tools.jline.console.ConsoleReader.readBinding(ConsoleReader.java:1136)
scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1218)
scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1170)
org.apache.spark.repl.SparkJLineReader.readOneLine(SparkJLineReader.scala:80)
scala.tools.nsc.interpreter.InteractiveReader$class.readLine(InteractiveReader.scala:43) 
org.apache.spark.repl.SparkJLineReader.readLine(SparkJLineReader.scala:25) 
org.apache.spark.repl.SparkILoop.readOneLine$1(SparkILoop.scala:648) 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
org.apache.spark.repl.Main$.main(Main.scala:31)
org.apache.spark.repl.Main.main(Main.scala) 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
java.lang.reflect.Method.invoke(Method.java:498) 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

在这个时候贴出来就是为了承上启下,我们可以清楚的看见(注意是从最后一行往上看)前面我们分析的过程,从SparkSubmit的main方法到submit、doRunMain、runMain到最后通过反射的方式调用org.apache.spark.repl.Main的main方法,整个流程都看的很清楚,所以下面我们进入org.apache.spark.repl.Main的main方法(包含了初始化的操作):

// 实例化SparkConf
val conf = new SparkConf()
// 设置各种文件路径
val tmp = System.getProperty("java.io.tmpdir")
val rootDir = conf.get("spark.repl.classdir", tmp)
val outputDir = Utils.createTempDir(rootDir)
val s = new Settings()
s.processArguments(List("-Yrepl-class-based",
  "-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
  "-classpath", getAddedJars.mkString(File.pathSeparator)), true)
// the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set if needed
val classServerPort = conf.getInt("spark.replClassServer.port", 0)
// 实例化了HttpServer,注意这里是lazy级别的
lazy val classServer =
  new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
var sparkContext: SparkContext = _
var sqlContext: SQLContext = _
// 实例化了SparkILoop,接下来会详细的分析
var interp = new SparkILoop // this is a public var because tests reset it.
// 执行一些初始化的处理后就执行main方法
def main(args: Array[String]) {
  // 判断是否为yarn的模式,我们在以后的文章中会专门的分析yarn的部署模式
  if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
  // Start the classServer and store its URI in a spark system property
  // (which will be passed to executors so that they can connect to it)
  // 启动HTTP server
  classServer.start()
  // 最关键的代码,让解释器循环执行,即REPL
  interp.process(s) // Repl starts and goes in loop of R.E.P.L
  classServer.stop()
  Option(sparkContext).map(_.stop)
}

写到这里我们再来贴出通过spark-shell进入REPL时打印的部分日志:

17/02/21 13:40:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/21 13:40:17 INFO spark.SecurityManager: Changing view acls to: root
17/02/21 13:40:17 INFO spark.SecurityManager: Changing modify acls to: root
17/02/21 13:40:17 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/02/21 13:40:18 INFO spark.HttpServer: Starting HTTP Server
17/02/21 13:40:18 INFO server.Server: jetty-8.y.z-SNAPSHOT
17/02/21 13:40:18 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:43773
17/02/21 13:40:18 INFO util.Utils: Successfully started service 'HTTP class server' on port 43773.

上面这段日志其实对应的就是classServer.start()的部分,以后我们再看到这些日志的时候就知道背后到底发生了什么,是不是很有成就感?

下面就进入SparkILoop和ILoop的部分(SparkILoop是继承自ILoop类,而SparkILoop中没有process方法,所以调用的实际上是ILoop类中的process方法):

ILoop

// 启动解释器,用来解释用户输入的command
// start an interpreter with the given settings
def process(settings: Settings): Boolean = savingContextLoader {
  this.settings = settings
  // 创建解释器,内部其实是实例化了一个ILoopInterpreter
  createInterpreter()
  // sets in to some kind of reader depending on environmental cues
  in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))
  globalFuture = future {
    intp.initializeSynchronous()
    loopPostInit()
    !intp.reporter.hasErrors
  }
  // 这里应该调用的是其子类SparkILoop的loadFiles方法,而SparkILoop的loadFiles方法内部最后又会调用这里的loadFiles方法
  loadFiles(settings)
  printWelcome()
  // 一直循环接收用户输入的command
  try loop() match {
    case LineResults.EOF => out print Properties.shellInterruptedString
    case _               =>
  }
  catch AbstractOrMissingHandler()
  finally closeInterpreter()
  true
}

我们先来看一下SparkILoop的loadFiles方法都做了什么:

override def loadFiles(settings: Settings): Unit = {
  initializeSpark()
  super.loadFiles(settings)
}

可以看到首先调用initializeSpark()方法,然后调用父类的loadFiles方法,目的就是先准备好SparkContext、SQLContext然后再执行后面的操作,方便我们在进入到REPL后直接可以访问sc、sqlContext等,所以我们现在明白了为什么我们可以直接在spark-shell中直接访问sc、sqlContext了(成就感爆棚有木有?)。说了这么多,我们看一下initializeSpark()的庐山真面目:

def initializeSpark() {
  intp.beQuietDuring {
    processLine("""
       @transient val sc = {
         val _sc = org.apache.spark.repl.Main.createSparkContext()
         println("Spark context available as sc.")
         _sc
       }
      """)
    processLine("""
       @transient val sqlContext = {
         val _sqlContext = org.apache.spark.repl.Main.createSQLContext()
         println("SQL context available as sqlContext.")
         _sqlContext
       }
      """)
    processLine("import org.apache.spark.SparkContext._")
    processLine("import sqlContext.implicits._")
    processLine("import sqlContext.sql")
    processLine("import org.apache.spark.sql.functions._")
  }
}

这里写的就非常清楚了通过processLine来创建SparkContext、SQLContext并导入一些经常使用的包,都准备完成后再调用父类的loadFiles,然后调用printWelcome(),注意这里调用的是SparkILoop的printWelcome()方法:

/** Print a welcome message */
override def printWelcome() {
  import org.apache.spark.SPARK_VERSION
  echo("""Welcome to
    ____              __
   / __/__  ___ _____/ /__
  _\ \/ _ \/ _ `/ __/  '_/
 /___/ .__/\_,_/_/ /_/\_\   version %s
    /_/
       """.format(SPARK_VERSION))
  val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
    versionString, javaVmName, javaVersion)
  echo(welcomeMsg)
  echo("Type in expressions to have them evaluated.")
  echo("Type :help for more information.")
}

咦?这货看着是不是很眼熟,对,这就是spark-shell中打印的日志:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.3
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.
17/02/21 13:40:24 INFO spark.SparkContext: Running Spark version 1.6.3
17/02/21 13:40:24 INFO spark.SecurityManager: Changing view acls to: root
17/02/21 13:40:24 INFO spark.SecurityManager: Changing modify acls to: root
17/02/21 13:40:24 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/02/21 13:40:25 INFO util.Utils: Successfully started service 'sparkDriver' on port 38463.
17/02/21 13:40:26 INFO slf4j.Slf4jLogger: Slf4jLogger started
17/02/21 13:40:26 INFO Remoting: Starting remoting
17/02/21 13:40:26 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@172.17.0.2:37221]
17/02/21 13:40:26 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 37221.
17/02/21 13:40:26 INFO spark.SparkEnv: Registering MapOutputTracker
17/02/21 13:40:26 INFO spark.SparkEnv: Registering BlockManagerMaster
17/02/21 13:40:26 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-a06685a8-6f1c-4e8f-805c-e232333f8d85
17/02/21 13:40:26 INFO storage.MemoryStore: MemoryStore started with capacity 511.1 MB
17/02/21 13:40:27 INFO spark.SparkEnv: Registering OutputCommitCoordinator
17/02/21 13:40:27 INFO server.Server: jetty-8.y.z-SNAPSHOT
17/02/21 13:40:27 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
17/02/21 13:40:27 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
17/02/21 13:40:27 INFO ui.SparkUI: Started SparkUI at http://172.17.0.2:4040
17/02/21 13:40:27 INFO client.AppClient$ClientEndpoint: Connecting to master spark://master:7077...
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20170221134027-0000
17/02/21 13:40:28 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44615.
17/02/21 13:40:28 INFO netty.NettyBlockTransferService: Server created on 44615
17/02/21 13:40:28 INFO storage.BlockManagerMaster: Trying to register BlockManager
17/02/21 13:40:28 INFO storage.BlockManagerMasterEndpoint: Registering block manager 172.17.0.2:44615 with 511.1 MB RAM, BlockManagerId(driver, 172.17.0.2, 44615)
17/02/21 13:40:28 INFO storage.BlockManagerMaster: Registered BlockManager
17/02/21 13:40:28 INFO client.AppClient$ClientEndpoint: Executor added: app-20170221134027-0000/0 on worker-20170221133811-172.17.0.3-41829 (172.17.0.3:41829) with 2 cores
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20170221134027-0000/0 on hostPort 172.17.0.3:41829 with 2 cores, 1024.0 MB RAM
17/02/21 13:40:28 INFO client.AppClient$ClientEndpoint: Executor added: app-20170221134027-0000/1 on worker-20170221133810-172.17.0.4-39901 (172.17.0.4:39901) with 2 cores
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20170221134027-0000/1 on hostPort 172.17.0.4:39901 with 2 cores, 1024.0 MB RAM
17/02/21 13:40:29 INFO client.AppClient$ClientEndpoint: Executor updated: app-20170221134027-0000/1 is now RUNNING
17/02/21 13:40:29 INFO client.AppClient$ClientEndpoint: Executor updated: app-20170221134027-0000/0 is now RUNNING
17/02/21 13:40:45 INFO scheduler.EventLoggingListener: Logging events to hdfs://master:9000/historyserverforspark/app-20170221134027-0000
17/02/21 13:40:45 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
17/02/21 13:40:45 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
17/02/21 13:40:46 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (worker1:60096) with ID 0
17/02/21 13:40:46 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (worker2:46846) with ID 1
17/02/21 13:40:47 INFO storage.BlockManagerMasterEndpoint: Registering block manager worker1:39275 with 511.1 MB RAM, BlockManagerId(0, worker1, 39275)
17/02/21 13:40:47 INFO storage.BlockManagerMasterEndpoint: Registering block manager worker2:37449 with 511.1 MB RAM, BlockManagerId(1, worker2, 37449)
17/02/21 13:40:50 INFO hive.HiveContext: Initializing execution hive, version 1.2.1
17/02/21 13:40:51 INFO client.ClientWrapper: Inspected Hadoop version: 2.6.0
17/02/21 13:40:51 INFO client.ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
17/02/21 13:40:52 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
17/02/21 13:40:53 INFO metastore.ObjectStore: ObjectStore, initialize called
17/02/21 13:40:53 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/02/21 13:40:53 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/02/21 13:40:54 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:40:55 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:01 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
17/02/21 13:41:08 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:08 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:15 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:15 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:17 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
17/02/21 13:41:17 INFO metastore.ObjectStore: Initialized ObjectStore
17/02/21 13:41:18 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/02/21 13:41:18 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
17/02/21 13:41:19 INFO metastore.HiveMetaStore: Added admin role in metastore
17/02/21 13:41:19 INFO metastore.HiveMetaStore: Added public role in metastore
17/02/21 13:41:19 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty
17/02/21 13:41:20 INFO metastore.HiveMetaStore: 0: get_all_databases
17/02/21 13:41:20 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr  cmd=get_all_databases   
17/02/21 13:41:20 INFO metastore.HiveMetaStore: 0: get_functions: db=default pat=*
17/02/21 13:41:20 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr  cmd=get_functions: db=default pat=* 
17/02/21 13:41:20 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:21 INFO session.SessionState: Created local directory: /tmp/939dedb5-f724-461b-a41a-a5fd1fe7324b_resources
17/02/21 13:41:21 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/939dedb5-f724-461b-a41a-a5fd1fe7324b
17/02/21 13:41:21 INFO session.SessionState: Created local directory: /tmp/root/939dedb5-f724-461b-a41a-a5fd1fe7324b
17/02/21 13:41:21 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/939dedb5-f724-461b-a41a-a5fd1fe7324b/_tmp_space.db
17/02/21 13:41:22 INFO hive.HiveContext: default warehouse location is /user/hive/warehouse
17/02/21 13:41:22 INFO hive.HiveContext: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
17/02/21 13:41:22 INFO client.ClientWrapper: Inspected Hadoop version: 2.6.0
17/02/21 13:41:22 INFO client.ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
17/02/21 13:41:23 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
17/02/21 13:41:23 INFO metastore.ObjectStore: ObjectStore, initialize called
17/02/21 13:41:23 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/02/21 13:41:23 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/02/21 13:41:24 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:24 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:25 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:30 INFO DataNucleus.Query: Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing
17/02/21 13:41:30 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
17/02/21 13:41:30 INFO metastore.ObjectStore: Initialized ObjectStore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: Added admin role in metastore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: Added public role in metastore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty
17/02/21 13:41:30 INFO metastore.HiveMetaStore: 0: get_all_databases
17/02/21 13:41:30 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr  cmd=get_all_databases   
17/02/21 13:41:30 INFO metastore.HiveMetaStore: 0: get_functions: db=default pat=*
17/02/21 13:41:30 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr  cmd=get_functions: db=default pat=* 
17/02/21 13:41:30 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:30 INFO session.SessionState: Created local directory: /tmp/c9c26571-1229-4786-8a8e-d7b090b07d85_resources
17/02/21 13:41:30 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/c9c26571-1229-4786-8a8e-d7b090b07d85
17/02/21 13:41:30 INFO session.SessionState: Created local directory: /tmp/root/c9c26571-1229-4786-8a8e-d7b090b07d85
17/02/21 13:41:30 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/c9c26571-1229-4786-8a8e-d7b090b07d85/_tmp_space.db
17/02/21 13:41:30 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

Welcome后面的一大串就是上面initializeSpark()执行打的日志信息,现在所有的日志信息都“名花有主”了,我们会单独拿出文章来分析SparkContext、SQLContext的创建流程,下面我们看process方法中最后就一直进行loop操作,这里我们不再深入的分析下去了,我们要适可而止,否则会迷失在源码中,大家可以简单的理解其实这里的循环过程就是REPL所代表的意思,即Read:读取用户输入的command;Evaluation:通过Spark Framework执行command;P:print打计算结果;L:loop循环前面的流程,同时在读取command后需要进行语法解析,然后用解释器执行,有兴趣的朋友可以继续跟随源码走下去。

至此我们走完了整个spark-shell(包括spark-submit)的整个流程,下面用一张图简单的总结一下:

注意:本图的前提是client模式

本文参考和拓展阅读:

Spark-1.6.3源码

Spark-2.1.0源码

本文为原创,欢迎转载,转载请注明出处、作者,谢谢!

上一篇下一篇

猜你喜欢

热点阅读