SparkContxt重要源码

2019-02-24  本文已影响0人  436048bfc6a1
  1. SparkContext重要源码

1.1 SparkContext 注释

Main entry point for Spark functionality.
spark功能的入口点(也就是说必须要存在SparkContext才能继续向下执行)

1.2 SparkContext的构造函数

class SparkContext(config: SparkConf)
从上可知,如果想要使用SparkContext,那么必须要有SparkConf
如果使用spark-shell,spark-shell程序启动时就创建了一个SparkContext,如下图所示

  1. SparkConf重要源码

2.1 注释

(1) Configuration for a Spark application

    Spark应用程序的配置

(2) Most of the time, you would create a SparkConf object with 
    `new SparkConf()`, which will load values from any `spark.*` 
    Java system properties set in your application as well

    大多数情况下,你将会使用new SparkConf()作为参数来创建SparkContext对象,
    SparkConf 将导入在应用中的java的system properties中任何以spark.开头的配置
    (不论配置是官方提供的,还是自定义的都是以spark.开头的)

2.1.1 自定义spark参数

命令行启动spark-shell
  ./spark-shell --master local[2] --conf name=henry --conf spark.age=18 
任何 --conf之后的key-value对都会加载到sparkConf中

执行spark-shell


上图中第一行,name=henry是non-spark的config, 被spark-shell忽略掉
从spark web ui上也可以清楚看见只生效了spark.age

2.1.2 从源码角度上分析只生效以spark.开头的配置

在SparkSubmitArguments的loadEnvironmentArguments方法中

2.1.2.1 以--master为例

//加载环境变量
private def loadEnvironmentArguments(): Unit{
  master = Option(master)
      .orElse(sparkProperties.get("spark.master"))
      .orElse(env.get("MASTER"))
      .orNull
}
首先去寻找spark.master
(wordcount代码第一行的setMaster底层就是给spark.master赋值)
如果没有找到spark.master的值,就去环境变量里去找MASTER

2.1.2.2 以--executor-memory为例

--executor-memory参数是设置每个executor使用的内存大小,默认是1G
private def loadEnvironmentArguments(): Unit{
    executorMemory = Option(executorMemory)
      .orElse(sparkProperties.get("spark.executor.memory"))
      .orElse(env.get("SPARK_EXECUTOR_MEMORY"))
      .orNull
}
与--master类似,也是先寻找spark.executor.memory

setMaster()源码

  def setMaster(master: String): SparkConf = {
    set("spark.master", master)
  }

2.1.3 如何获得spark的默认配置参数

  lazy val defaultSparkProperties: HashMap[String, String] = {
    val defaultProperties = new HashMap[String, String]()
    //注释1
    Option(propertiesFile).foreach { filename =>
    //注释2
      val properties = Utils.getPropertiesFromFile(filename)
      properties.foreach { case (k, v) =>
        defaultProperties(k) = v
      }
      if (verbose) {
        Utils.redact(properties).foreach { case (k, v) =>
          logInfo(s"Adding default property: $k=$v")
        }
      }
    }
    defaultProperties
  }
mergeDefaultSparkProperties()
//注释3
ignoreNonSparkProperties()

  private def mergeDefaultSparkProperties(): Unit = {
    propertiesFile = Option(propertiesFile).
                     getOrElse(Utils.getDefaultPropertiesFile(env))
    defaultSparkProperties.foreach { case (k, v) =>
      if (!sparkProperties.contains(k)) {
        sparkProperties(k) = v
      }
    }
  }
  def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
    env.get("SPARK_CONF_DIR")
      .orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
      .map { t => new File(s"$t${File.separator}spark-defaults.conf")}
      .filter(_.isFile)
      .map(_.getAbsolutePath)
      .orNull
  }
  private def ignoreNonSparkProperties(): Unit = {
    sparkProperties.foreach { case (k, v) =>
      if (!k.startsWith("spark.")) {
        sparkProperties -= k
        logWarning(s"Ignoring non-spark config property: $k=$v")
      }
    }
  }
2.1.3.1 注释1
(1) 作用
```md
明确filename代表什么

(2) 作用代码

Option(propertiesFile).foreach { filename => {}
}

(3) 调用情况

defaultSparkProperties()==>mergeDefaultSparkProperties()==>getDefaultPropertiesFile()

(4) 流程简要说明

首先需要知道filename,也就是propertiesFile来自于哪里
之后在该类中发现, 如果不是人为设置,
   该类里只有在mergeDefaultSparkProperties()对propertiesFile赋值
mergeDefaultSparkProperties()得到getDefaultPropertiesFile()的返回值
   也就是说默认配置文件的路径

(5) 相关方法重要代码说明

getDefaultPropertiesFile
    先从环境中找到SPARK_CONF_DIR
    如果没有配置的话,就去环境变量中找SPARK_HOME
    然后找到其conf文件夹中spark-defaults.conf文件
    getDefaultPropertiesFile返回的就是默认的spark配置文件的路径
    也就是说defaultSparkProperties的filename就是默认的spark配置文件的路径
    

2.1.3.2 注释2
(1) 作用

 根据默认配置文件路径,获得键值对的properties

(2)作用代码

val properties = Utils.getPropertiesFromFile(filename)

(3) 调用情况

defaultSparkProperties() ==> getDefaultPropertiesFile()

(4) 流程简要说明

作用代码调用Utils.getPropertiesFromFile(filename)
properties是一个键值对

(5)相关方法重要代码说明

getPropertiesFromFile()
    拿到文件名后,读取该文件,将其每一行都变成key-value pair
    返回的是一个map

2.1.3.3 注释3
(1) 作用

从properties变量中移除key不是以spark.开头的key

(2) 作用代码

ignoreNonSparkProperties()

(3) 调用情况

defaultSparkProperties() ==> Utils.ignoreNonSparkProperties()

(4) 相关方法重要代码说明

ignoreNonSparkProperties()
  循环sparkProperties取出key不是以spark.开头的
    

2.1.3.4 mergeDefaultSparkProperties()
(1) 作用

将命令行 --conf后添加的自定义spark参数放入sparkProperties中

(2) 调用情况

mergeDefaultSparkProperties ==> defaultSparkProperties()

(3) 相关代码重要说明

遍历defaultSparkProperties返回的defaultProperties(key-value pair),  变成sparkProperties的key-value pair

2.2 spark-defaults.conf

2.2.1 作用

保存spark所有公共属性
将需要配置的属性配置在该文件中
之后就不需要写命令行参数(./saprk-shell --master local[2])
因为系统会自动找到配置在该文件中的参数

2.2.2 为什么要自定义spark参数

当连接数据库时,数据库的url需要灵活改变,此时自定义参数发挥优势

2.3 如何得到自定义的参数的值

sc.getConf.get("spark.henry")

2.4 参数定义的位置与优先级

2.4.1 源码注释

In this case, parameters you set directly on the `SparkConf` 
object take priority over system properties

2.4.2 翻译

配置在SparkConf的优先级比系统配置高

2.4.3 如何理解

首先在命令行输入

2.5 理解上述可以做什么

在文件夹henrydata-etl下有lib、shell、logs、conf文件夹
在conf下文件夹有etl-default.conf文件,里面保存的是需要的key-value
此时直接按照之前代码大意,调用scala工具类就可以解析出来
省去人为解析的麻烦
  1. sparkContext的重要源码解析

3.1 创建spark运行环境

_env = createSparkEnv(_conf, isLocal, listenerBus)
  private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
  }
private[spark] def createDriverEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus,
      numCores: Int,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    assert(conf.contains(DRIVER_HOST_ADDRESS),
      s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
    assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
    val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
    val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
    val port = conf.get("spark.driver.port").toInt
    val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
      Some(CryptoStreamUtils.createKey(conf))
    } else {
      None
    }
    create(
      conf,
      SparkContext.DRIVER_IDENTIFIER,
      bindAddress,
      advertiseAddress,
      Option(port),
      isLocal,
      numCores,
      ioEncryptionKey,
      listenerBus = listenerBus,
      mockOutputCommitCoordinator = mockOutputCommitCoordinator
    )
  }
 //定义driver端口
//key为spark.driver.bindAddress的值为端口值
  private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress")
    .doc("Address where to bind network listen sockets on the driver.")
    .fallbackConf(DRIVER_HOST_ADDRESS)

3.1.2 create函数重要代码

使用反射创建实例
//使用反射使用指定类名创建实例
def instantiateClass[T](className: String): T = {
      val cls = Utils.classForName(className)
      // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
      // SparkConf, then one taking no arguments
      try {
        cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
          .newInstance(conf, new java.lang.Boolean(isDriver))
          .asInstanceOf[T]
      } catch {
        case _: NoSuchMethodException =>
          try {
            cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
          } catch {
            case _: NoSuchMethodException =>
              cls.getConstructor().newInstance().asInstanceOf[T]
          }
      }
    }

3.2 WEB UI

private[spark] def ui: Option[SparkUI] = _ui
   //默认情况下为true
    _ui =
      if (conf.getBoolean("spark.ui.enabled", true)) {
        Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
          startTime))
      } else {
        // For tests, do not enable the UI
        None
      }
def create(
      sc: Option[SparkContext],
      store: AppStatusStore,
      conf: SparkConf,
      securityManager: SecurityManager,
      appName: String,
      basePath: String,
      startTime: Long,
      appSparkVersion: String = org.apache.spark.SPARK_VERSION): SparkUI = {
    //使用new sparkUI的方式创建UI
    new SparkUI(store, sc, conf, securityManager, appName, basePath, startTime, appSparkVersion)
  }
private[spark] class SparkUI private(
    //存储信息
    val store: AppStatusStore,
    //环境变量内的信息
    val sc: Option[SparkContext],
    //配置信息
    val conf: SparkConf,
    securityManager: SecurityManager,
    var appName: String,
    val basePath: String,
    val startTime: Long,
    //版本信息,对应spark web ui的导航栏的版本号
    val appSparkVersion: String
)
private[spark] abstract class WebUI(
    val securityManager: SecurityManager,
    val sslOptions: SSLOptions,
    port: Int,
    conf: SparkConf,
    basePath: String = "",
    name: String = ""
)
extends Logging {
  //
   protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS"))
        .getOrElse(conf.get(DRIVER_HOST_ADDRESS))
   def webUrl: String = s"http://$publicHostName:$boundPort"
}
上一篇下一篇

猜你喜欢

热点阅读