Spark-Core源码精读(3)、spark-shell(sp
本文将解读使用spark-shell的方式进入REPL的具体流程。
注:本专题的文章皆使用Spark-1.6.3版本的源码为参考,如果Spark-2.1.0版本有重大改进的地方也会进行说明。
shell部分
下面我们来看一下当我们输入 spark-shell --master spark://master:7077时具体的执行流程,首先当然是看一下spark-shell.sh的源码,我们只选取了相对比较重要的部分:
##检测有没有设置SPARK_HOME环境变量,如果没有进行设置
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
##...
function main() {
if $cygwin; then
# Workaround for issue involving JLine and Cygwin
# (see http://sourceforge.net/p/jline/bugs/40/).
# If you're using the Mintty terminal emulator in Cygwin, may need to set the
# "Backspace sends ^H" setting in "Keys" section of the Mintty options
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
"${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
fi
}
##...
main "$@"
##...
可以看出最后执行的是main方法并传入我们使用spark-shell命令时候的所有参数,比如--master,而main方法中无论是什么操作系统(当然生产环境是linux系统)都会最终执行spark-submit,并且class为org.apache.spark.repl.Main、name为“Spark shell”并且将spark-shell所有接收到的用户输入的参数一起传进去,下面我们来看spark-submit:
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
spark-submit的代码比较简洁,最后使用exec通过spark-class来启动SparkSubmit并将spark-submit接收到的所有参数传入,下面我们来看一下spark-class:(这里要说明一下,从这里开始起始就是我们通过spark-submit提交application的过程,只不过spark-submit提交的时候后面跟的是用户自己编写的类,而通过spark-shell过来的spark-submit后面跟的是org.apache.spark.repl.Main,spark-submit方式提交的application运行完成后就会结束,而通过spark-shell进入的REPL一直等待用户的输入)
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
## 载入环境变量
. "${SPARK_HOME}"/bin/load-spark-env.sh
## 获得java的二进制文件,后面会用来启动一个JVM进行
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
## jar包的相关依赖
# Find assembly jar
SPARK_ASSEMBLY_JAR=
if [ -f "${SPARK_HOME}/RELEASE" ]; then
ASSEMBLY_DIR="${SPARK_HOME}/lib"
else
ASSEMBLY_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION"
fi
GREP_OPTIONS=
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" -a "$SPARK_PREPEND_CLASSES" != "1" ]; then
echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
if [ -d "$ASSEMBLY_DIR" ]; then
ASSEMBLY_JARS="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" || true)"
if [ "$num_jars" -gt "1" ]; then
echo "Found multiple Spark assembly jars in $ASSEMBLY_DIR:" 1>&2
echo "$ASSEMBLY_JARS" 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi
fi
SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"
LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"
# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"
# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
## 使用java -cp命令启动一个JVM进程并执行org.apache.spark.launcher.Main类的main方法,后面我们会看到这个进程就是SparkSubmit进程
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"
spark-class是Spark应用程序的命令行启动器,负责设置JVM环境并执行Spark的应用程序,这里我们执行的就是SparkSubmit,下面我们就进入到Spark源码的部分。
Spark源码部分
承接上文,我们直接进入Spark的源码:
关于org.apache.spark.launcher.Main的源码我们这里不做说明,大家可以参考我的另一篇文章(对Main类的补充说明),简单来说它会根据传入的类进行判断然后生成相应的command,最后交给exec来执行,我们现在主要关注Spark本身,所以直接进入SparkSubmit的源码部分:
def main(args: Array[String]): Unit = {
/** 使用SparkSubmitArguments封装spark-submit传入的参数,还记得都有什么吗?
* 如果是spark-shell,就包括spark-shell及后面的一串参数,如果是直接使用spark-submit进行提交
* 后面就是提交时传入的参数,由于SparkSubmitArguments中的参数比较多,本文中不再一一列出
* 会在使用到某个参数的时候进行说明,详细的参数可以参看SparkSubmitArguments的源码。
*/
val appArgs = new SparkSubmitArguments(args)
// 如果开启了debug模式就打印出参数
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
/** 这里的action就是spark-submit执行的动作,包括:SUBMIT, KILL, REQUEST_STATUS(使
* 用了SparkSubmitAction进行了封装),如果没有指定,默认就是SparkSubmitAction.SUBMIT,
* 所以下面的这个模式匹配将执行submit(appArgs)
*/
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
下面我们来看submit(appArgs)方法:
/**
* submit方法的主要功能就是使用传进来的参数来提交应用程序。
* 主要分为两步骤:
* 1. 准备启动所需的环境,包括设置classpath、系统参数和应用程序的参数(根据部署模式和cluster
* manager运行child main类)。
* 2. 使用上一步准备好的环境调用child main class中的main函数,我们这里只考虑client模式,
* cluster模式我们以后会单独分析。
* 所以如果是spark-shell,child main class就是org.apache.spark.repl.Main,如果是
* spark-submit直接进行提交,child main class就是用户编写的应用程序(含有main方法的类)
*/
private def submit(args: SparkSubmitArguments): Unit = {
// 准备环境,主要就是获得childMainClass,即我们上面所说的child main class
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
// 注意:源码中这里是doRunMain()方法,我们在后面单独拿出来进行分析
// 判断gateway使用的是Akka还是基于REST的,但是不论那种方式最后都会调用doRunMain()方法
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
doRunMain()的实现部分:
def doRunMain(): Unit = {
if (args.proxyUser != null) {
// 这里是hadoop相关的用户和组的信息
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
我们看到doRunMain()内部最终都执行了runMain方法,所以我们进入runMain方法:
/** 别看这个方法这么长,主要做的事情就是一件:运行child main class的main方法
再次说明一下,如果是直接使用spark-submit提交的应用程序,就是执行用户指定的类的main方法
如果是通过spark-shell执行的,就是执行org.apache.spark.repl.Main中的main方法
*/
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
//是否打印debug信息
// scalastyle:off println
if (verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")
printStream.println("\n")
}
// scalastyle:on println
// 下面这些操作是指定当前运行线程的ClassLoader
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
// 添加jar依赖
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
// 系统属性
for ((key, value) <- sysProps) {
System.setProperty(key, value)
}
var mainClass: Class[_] = null
// 通过反射的方式获得mainClass(child main class)
try {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
e.printStackTrace(printStream)
if (childMainClass.contains("thriftserver")) {
// scalastyle:off println
printStream.println(s"Failed to load main class $childMainClass.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
e.printStackTrace(printStream)
if (e.getMessage.contains("org/apache/hadoop/hive")) {
// scalastyle:off println
printStream.println(s"Failed to load hive class.")
printStream.println("You need to build Spark with -Phive and -Phive-thriftserver.")
// scalastyle:on println
}
System.exit(CLASS_NOT_FOUND_EXIT_STATUS)
}
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
// 获得mainClass(child main class)的main方法
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
// main方法必须是static级别的
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: InvocationTargetException =>
if (e.getCause() != null) findCause(e.getCause()) else e
case e: Throwable =>
e
}
// 最后调用main方法
try {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
走到这里,如果是用户通过spark-submit提交自己编写的spark application,那么就直接调用main方法,然后一步一步执行用户编写的代码:SparkContext等等,我们会在以后的文章中进行分析,所以我们现在要跟随的就是org.apache.spark.repl.Main中的main方法(注意本文中我们只讨论client的模式,至于cluster的模式我们会单独进行分析),这里我们贴出SparkSubmit进程中主线程的thread dump:
java.io.FileInputStream.read0(Native Method)
java.io.FileInputStream.read(FileInputStream.java:207)
scala.tools.jline.TerminalSupport.readCharacter(TerminalSupport.java:152)
scala.tools.jline.UnixTerminal.readVirtualKey(UnixTerminal.java:125)
scala.tools.jline.console.ConsoleReader.readVirtualKey(ConsoleReader.java:933)
scala.tools.jline.console.ConsoleReader.readBinding(ConsoleReader.java:1136)
scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1218)
scala.tools.jline.console.ConsoleReader.readLine(ConsoleReader.java:1170)
org.apache.spark.repl.SparkJLineReader.readOneLine(SparkJLineReader.scala:80)
scala.tools.nsc.interpreter.InteractiveReader$class.readLine(InteractiveReader.scala:43)
org.apache.spark.repl.SparkJLineReader.readLine(SparkJLineReader.scala:25)
org.apache.spark.repl.SparkILoop.readOneLine$1(SparkILoop.scala:648)
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
org.apache.spark.repl.Main$.main(Main.scala:31)
org.apache.spark.repl.Main.main(Main.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
在这个时候贴出来就是为了承上启下,我们可以清楚的看见(注意是从最后一行往上看)前面我们分析的过程,从SparkSubmit的main方法到submit、doRunMain、runMain到最后通过反射的方式调用org.apache.spark.repl.Main的main方法,整个流程都看的很清楚,所以下面我们进入org.apache.spark.repl.Main的main方法(包含了初始化的操作):
// 实例化SparkConf
val conf = new SparkConf()
// 设置各种文件路径
val tmp = System.getProperty("java.io.tmpdir")
val rootDir = conf.get("spark.repl.classdir", tmp)
val outputDir = Utils.createTempDir(rootDir)
val s = new Settings()
s.processArguments(List("-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
"-classpath", getAddedJars.mkString(File.pathSeparator)), true)
// the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set if needed
val classServerPort = conf.getInt("spark.replClassServer.port", 0)
// 实例化了HttpServer,注意这里是lazy级别的
lazy val classServer =
new HttpServer(conf, outputDir, new SecurityManager(conf), classServerPort, "HTTP class server")
var sparkContext: SparkContext = _
var sqlContext: SQLContext = _
// 实例化了SparkILoop,接下来会详细的分析
var interp = new SparkILoop // this is a public var because tests reset it.
// 执行一些初始化的处理后就执行main方法
def main(args: Array[String]) {
// 判断是否为yarn的模式,我们在以后的文章中会专门的分析yarn的部署模式
if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
// Start the classServer and store its URI in a spark system property
// (which will be passed to executors so that they can connect to it)
// 启动HTTP server
classServer.start()
// 最关键的代码,让解释器循环执行,即REPL
interp.process(s) // Repl starts and goes in loop of R.E.P.L
classServer.stop()
Option(sparkContext).map(_.stop)
}
写到这里我们再来贴出通过spark-shell进入REPL时打印的部分日志:
17/02/21 13:40:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/21 13:40:17 INFO spark.SecurityManager: Changing view acls to: root
17/02/21 13:40:17 INFO spark.SecurityManager: Changing modify acls to: root
17/02/21 13:40:17 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/02/21 13:40:18 INFO spark.HttpServer: Starting HTTP Server
17/02/21 13:40:18 INFO server.Server: jetty-8.y.z-SNAPSHOT
17/02/21 13:40:18 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:43773
17/02/21 13:40:18 INFO util.Utils: Successfully started service 'HTTP class server' on port 43773.
上面这段日志其实对应的就是classServer.start()的部分,以后我们再看到这些日志的时候就知道背后到底发生了什么,是不是很有成就感?
下面就进入SparkILoop和ILoop的部分(SparkILoop是继承自ILoop类,而SparkILoop中没有process方法,所以调用的实际上是ILoop类中的process方法):
ILoop
// 启动解释器,用来解释用户输入的command
// start an interpreter with the given settings
def process(settings: Settings): Boolean = savingContextLoader {
this.settings = settings
// 创建解释器,内部其实是实例化了一个ILoopInterpreter
createInterpreter()
// sets in to some kind of reader depending on environmental cues
in = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true))
globalFuture = future {
intp.initializeSynchronous()
loopPostInit()
!intp.reporter.hasErrors
}
// 这里应该调用的是其子类SparkILoop的loadFiles方法,而SparkILoop的loadFiles方法内部最后又会调用这里的loadFiles方法
loadFiles(settings)
printWelcome()
// 一直循环接收用户输入的command
try loop() match {
case LineResults.EOF => out print Properties.shellInterruptedString
case _ =>
}
catch AbstractOrMissingHandler()
finally closeInterpreter()
true
}
我们先来看一下SparkILoop的loadFiles方法都做了什么:
override def loadFiles(settings: Settings): Unit = {
initializeSpark()
super.loadFiles(settings)
}
可以看到首先调用initializeSpark()方法,然后调用父类的loadFiles方法,目的就是先准备好SparkContext、SQLContext然后再执行后面的操作,方便我们在进入到REPL后直接可以访问sc、sqlContext等,所以我们现在明白了为什么我们可以直接在spark-shell中直接访问sc、sqlContext了(成就感爆棚有木有?)。说了这么多,我们看一下initializeSpark()的庐山真面目:
def initializeSpark() {
intp.beQuietDuring {
processLine("""
@transient val sc = {
val _sc = org.apache.spark.repl.Main.createSparkContext()
println("Spark context available as sc.")
_sc
}
""")
processLine("""
@transient val sqlContext = {
val _sqlContext = org.apache.spark.repl.Main.createSQLContext()
println("SQL context available as sqlContext.")
_sqlContext
}
""")
processLine("import org.apache.spark.SparkContext._")
processLine("import sqlContext.implicits._")
processLine("import sqlContext.sql")
processLine("import org.apache.spark.sql.functions._")
}
}
这里写的就非常清楚了通过processLine来创建SparkContext、SQLContext并导入一些经常使用的包,都准备完成后再调用父类的loadFiles,然后调用printWelcome(),注意这里调用的是SparkILoop的printWelcome()方法:
/** Print a welcome message */
override def printWelcome() {
import org.apache.spark.SPARK_VERSION
echo("""Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version %s
/_/
""".format(SPARK_VERSION))
val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
versionString, javaVmName, javaVersion)
echo(welcomeMsg)
echo("Type in expressions to have them evaluated.")
echo("Type :help for more information.")
}
咦?这货看着是不是很眼熟,对,这就是spark-shell中打印的日志:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.3
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.
17/02/21 13:40:24 INFO spark.SparkContext: Running Spark version 1.6.3
17/02/21 13:40:24 INFO spark.SecurityManager: Changing view acls to: root
17/02/21 13:40:24 INFO spark.SecurityManager: Changing modify acls to: root
17/02/21 13:40:24 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/02/21 13:40:25 INFO util.Utils: Successfully started service 'sparkDriver' on port 38463.
17/02/21 13:40:26 INFO slf4j.Slf4jLogger: Slf4jLogger started
17/02/21 13:40:26 INFO Remoting: Starting remoting
17/02/21 13:40:26 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@172.17.0.2:37221]
17/02/21 13:40:26 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 37221.
17/02/21 13:40:26 INFO spark.SparkEnv: Registering MapOutputTracker
17/02/21 13:40:26 INFO spark.SparkEnv: Registering BlockManagerMaster
17/02/21 13:40:26 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-a06685a8-6f1c-4e8f-805c-e232333f8d85
17/02/21 13:40:26 INFO storage.MemoryStore: MemoryStore started with capacity 511.1 MB
17/02/21 13:40:27 INFO spark.SparkEnv: Registering OutputCommitCoordinator
17/02/21 13:40:27 INFO server.Server: jetty-8.y.z-SNAPSHOT
17/02/21 13:40:27 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
17/02/21 13:40:27 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
17/02/21 13:40:27 INFO ui.SparkUI: Started SparkUI at http://172.17.0.2:4040
17/02/21 13:40:27 INFO client.AppClient$ClientEndpoint: Connecting to master spark://master:7077...
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20170221134027-0000
17/02/21 13:40:28 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44615.
17/02/21 13:40:28 INFO netty.NettyBlockTransferService: Server created on 44615
17/02/21 13:40:28 INFO storage.BlockManagerMaster: Trying to register BlockManager
17/02/21 13:40:28 INFO storage.BlockManagerMasterEndpoint: Registering block manager 172.17.0.2:44615 with 511.1 MB RAM, BlockManagerId(driver, 172.17.0.2, 44615)
17/02/21 13:40:28 INFO storage.BlockManagerMaster: Registered BlockManager
17/02/21 13:40:28 INFO client.AppClient$ClientEndpoint: Executor added: app-20170221134027-0000/0 on worker-20170221133811-172.17.0.3-41829 (172.17.0.3:41829) with 2 cores
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20170221134027-0000/0 on hostPort 172.17.0.3:41829 with 2 cores, 1024.0 MB RAM
17/02/21 13:40:28 INFO client.AppClient$ClientEndpoint: Executor added: app-20170221134027-0000/1 on worker-20170221133810-172.17.0.4-39901 (172.17.0.4:39901) with 2 cores
17/02/21 13:40:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20170221134027-0000/1 on hostPort 172.17.0.4:39901 with 2 cores, 1024.0 MB RAM
17/02/21 13:40:29 INFO client.AppClient$ClientEndpoint: Executor updated: app-20170221134027-0000/1 is now RUNNING
17/02/21 13:40:29 INFO client.AppClient$ClientEndpoint: Executor updated: app-20170221134027-0000/0 is now RUNNING
17/02/21 13:40:45 INFO scheduler.EventLoggingListener: Logging events to hdfs://master:9000/historyserverforspark/app-20170221134027-0000
17/02/21 13:40:45 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
17/02/21 13:40:45 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
17/02/21 13:40:46 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (worker1:60096) with ID 0
17/02/21 13:40:46 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (worker2:46846) with ID 1
17/02/21 13:40:47 INFO storage.BlockManagerMasterEndpoint: Registering block manager worker1:39275 with 511.1 MB RAM, BlockManagerId(0, worker1, 39275)
17/02/21 13:40:47 INFO storage.BlockManagerMasterEndpoint: Registering block manager worker2:37449 with 511.1 MB RAM, BlockManagerId(1, worker2, 37449)
17/02/21 13:40:50 INFO hive.HiveContext: Initializing execution hive, version 1.2.1
17/02/21 13:40:51 INFO client.ClientWrapper: Inspected Hadoop version: 2.6.0
17/02/21 13:40:51 INFO client.ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
17/02/21 13:40:52 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
17/02/21 13:40:53 INFO metastore.ObjectStore: ObjectStore, initialize called
17/02/21 13:40:53 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/02/21 13:40:53 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/02/21 13:40:54 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:40:55 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:01 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
17/02/21 13:41:08 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:08 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:15 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:15 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:17 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
17/02/21 13:41:17 INFO metastore.ObjectStore: Initialized ObjectStore
17/02/21 13:41:18 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/02/21 13:41:18 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
17/02/21 13:41:19 INFO metastore.HiveMetaStore: Added admin role in metastore
17/02/21 13:41:19 INFO metastore.HiveMetaStore: Added public role in metastore
17/02/21 13:41:19 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty
17/02/21 13:41:20 INFO metastore.HiveMetaStore: 0: get_all_databases
17/02/21 13:41:20 INFO HiveMetaStore.audit: ugi=root ip=unknown-ip-addr cmd=get_all_databases
17/02/21 13:41:20 INFO metastore.HiveMetaStore: 0: get_functions: db=default pat=*
17/02/21 13:41:20 INFO HiveMetaStore.audit: ugi=root ip=unknown-ip-addr cmd=get_functions: db=default pat=*
17/02/21 13:41:20 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:21 INFO session.SessionState: Created local directory: /tmp/939dedb5-f724-461b-a41a-a5fd1fe7324b_resources
17/02/21 13:41:21 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/939dedb5-f724-461b-a41a-a5fd1fe7324b
17/02/21 13:41:21 INFO session.SessionState: Created local directory: /tmp/root/939dedb5-f724-461b-a41a-a5fd1fe7324b
17/02/21 13:41:21 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/939dedb5-f724-461b-a41a-a5fd1fe7324b/_tmp_space.db
17/02/21 13:41:22 INFO hive.HiveContext: default warehouse location is /user/hive/warehouse
17/02/21 13:41:22 INFO hive.HiveContext: Initializing HiveMetastoreConnection version 1.2.1 using Spark classes.
17/02/21 13:41:22 INFO client.ClientWrapper: Inspected Hadoop version: 2.6.0
17/02/21 13:41:22 INFO client.ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
17/02/21 13:41:23 INFO metastore.HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
17/02/21 13:41:23 INFO metastore.ObjectStore: ObjectStore, initialize called
17/02/21 13:41:23 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
17/02/21 13:41:23 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored
17/02/21 13:41:24 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:24 WARN DataNucleus.Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/02/21 13:41:25 INFO metastore.ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:29 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:30 INFO DataNucleus.Query: Reading in results for query "org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is closing
17/02/21 13:41:30 INFO metastore.MetaStoreDirectSql: Using direct SQL, underlying DB is DERBY
17/02/21 13:41:30 INFO metastore.ObjectStore: Initialized ObjectStore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: Added admin role in metastore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: Added public role in metastore
17/02/21 13:41:30 INFO metastore.HiveMetaStore: No user is added in admin role, since config is empty
17/02/21 13:41:30 INFO metastore.HiveMetaStore: 0: get_all_databases
17/02/21 13:41:30 INFO HiveMetaStore.audit: ugi=root ip=unknown-ip-addr cmd=get_all_databases
17/02/21 13:41:30 INFO metastore.HiveMetaStore: 0: get_functions: db=default pat=*
17/02/21 13:41:30 INFO HiveMetaStore.audit: ugi=root ip=unknown-ip-addr cmd=get_functions: db=default pat=*
17/02/21 13:41:30 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table.
17/02/21 13:41:30 INFO session.SessionState: Created local directory: /tmp/c9c26571-1229-4786-8a8e-d7b090b07d85_resources
17/02/21 13:41:30 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/c9c26571-1229-4786-8a8e-d7b090b07d85
17/02/21 13:41:30 INFO session.SessionState: Created local directory: /tmp/root/c9c26571-1229-4786-8a8e-d7b090b07d85
17/02/21 13:41:30 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/c9c26571-1229-4786-8a8e-d7b090b07d85/_tmp_space.db
17/02/21 13:41:30 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.
Welcome后面的一大串就是上面initializeSpark()执行打的日志信息,现在所有的日志信息都“名花有主”了,我们会单独拿出文章来分析SparkContext、SQLContext的创建流程,下面我们看process方法中最后就一直进行loop操作,这里我们不再深入的分析下去了,我们要适可而止,否则会迷失在源码中,大家可以简单的理解其实这里的循环过程就是REPL所代表的意思,即Read:读取用户输入的command;Evaluation:通过Spark Framework执行command;P:print打计算结果;L:loop循环前面的流程,同时在读取command后需要进行语法解析,然后用解释器执行,有兴趣的朋友可以继续跟随源码走下去。
至此我们走完了整个spark-shell(包括spark-submit)的整个流程,下面用一张图简单的总结一下:
注意:本图的前提是client模式本文参考和拓展阅读:
本文为原创,欢迎转载,转载请注明出处、作者,谢谢!