SparkEnv 和 RpcEnv 源码浅析

2019-02-05  本文已影响0人  越过山丘xyz

SparkEnv 与 RpcEnv

SparkEnv 保存着 Application 运行时的环境信息,包括 RpcEnv、Serializer、Block Manager 和 ShuffleManager 等,并为 Driver 端和 Executor 端分别提供了不同的创建方式。

RpcEnv 维持着 Spark 节点间的通信,并负责将传递过来的消息转发给监听者(通信端)。

流程概览

SparkEnv 在 Spark-Core 的 org.apache.spark 包下。

SparkEnv

在 SparkContext 对 SparkEnv 进行初始化的时候调用了 SparkContext.createSparkEnv() 方法,在 SparkContext 概览中提到过:

// 这两行代码在 SparkContext 中
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)

我们以 SparkContext.createSparkEnv() 作为入口,看看 SparkEnv 到底做了些什么:

private[spark] def createSparkEnv(
    conf: SparkConf,
    isLocal: Boolean,
    listenerBus: LiveListenerBus): SparkEnv = {
  // 只有这一行语句
  SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}

SparkEnv.createDriverEnv() 是创建 Driver 端 SparkEnv 的起始点:

private[spark] def createDriverEnv(
    conf: SparkConf,
    isLocal: Boolean,
    listenerBus: LiveListenerBus,
    numCores: Int,
    mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
 
  // 从 SparkConf 中获取 Driver 的相关信息
  val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
  val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
  val port = conf.get("spark.driver.port").toInt
  val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
    Some(CryptoStreamUtils.createKey(conf))
  } else {
    None
  }
  
  // SparkEnv 为 Driver 端和 Executor 端提供了不同的创建方式
  // 但是最终都会调用这个方法
  create(
    conf,
    SparkContext.DRIVER_IDENTIFIER,
    bindAddress,
    advertiseAddress,
    port,
    isLocal,
    numCores,
    ioEncryptionKey,
    listenerBus = listenerBus,
    mockOutputCommitCoordinator = mockOutputCommitCoordinator
  )
}

我们顺便看一下 SparkEnv.createExecutorEnv() 方法:

private[spark] def createExecutorEnv(
    conf: SparkConf,
    executorId: String,
    hostname: String,
    port: Int,
    numCores: Int,
    ioEncryptionKey: Option[Array[Byte]],
    isLocal: Boolean): SparkEnv = {
  
  // 直接调用了 create() 方法
  val env = create(
    conf,
    executorId,
    hostname,
    hostname,
    port,
    isLocal,
    numCores,
    ioEncryptionKey
  )
  SparkEnv.set(env)
  env
}

这两种创建方式最终都调用了 SparkEnv.create() 方法,接下来我们看看它的实现细节:

private def create(
    conf: SparkConf,
    executorId: String,
    bindAddress: String,
    advertiseAddress: String,
    port: Int,
    isLocal: Boolean,
    numUsableCores: Int,
    ioEncryptionKey: Option[Array[Byte]],
    listenerBus: LiveListenerBus = null,
    mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

  // 是否为 Driver 端
  // 可以看出 Driver 端也是在一个 Executor 中运行的
  val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

  // RPC 环境,注意下这个
  val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
    securityManager, clientMode = !isDriver)

  // 对 conf 参数进行设置
  if (isDriver) {
    conf.set("spark.driver.port", rpcEnv.address.port.toString)
  } else if (rpcEnv.address != null) {
    conf.set("spark.executor.port", rpcEnv.address.port.toString)
  }

  // 默认使用的是 Java 序列化
  // 可以设置 Kryo
  val serializer = instantiateClassFromConf[Serializer](
    "spark.serializer", "org.apache.spark.serializer.JavaSerializer")

  // 序列化管理器
  val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

  // 注册或查找 Endpoint
  def registerOrLookupEndpoint(
      name: String, endpointCreator: => RpcEndpoint):
    RpcEndpointRef = {
    if (isDriver) {
      logInfo("Registering " + name)
      rpcEnv.setupEndpoint(name, endpointCreator)
    } else {
      RpcUtils.makeDriverRef(name, conf, rpcEnv)
    }
  }

  // 广播器 Manager
  val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
    
  // Map 输出跟踪器
  val mapOutputTracker = if (isDriver) {
    new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
  } else {
    new MapOutputTrackerWorker(conf)
  }

  // 注册 Map 输出跟踪器通信端
  // 通信端负责监听并处理节点间传递过来的消息
  mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
    new MapOutputTrackerMasterEndpoint(
      rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))


  val shortShuffleMgrNames = Map(
    "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
    "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
  // 默认使用 SortShuffleManager
  val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
  val shuffleMgrClass =
    shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
  val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

  val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
  // 默认使用通用内存 Manager
  val memoryManager: MemoryManager =
    if (useLegacyMemoryManager) {
      new StaticMemoryManager(conf, numUsableCores)
    } else {
      UnifiedMemoryManager(conf, numUsableCores)
    }

 
  val blockManagerPort = if (isDriver) {
    conf.get(DRIVER_BLOCK_MANAGER_PORT)
  } else {
    conf.get(BLOCK_MANAGER_PORT)
  }

  val blockTransferService =
    new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
      blockManagerPort, numUsableCores)
  
  // 创建 BlockManagerMaster,并且向 RpcEnv 注册通信端(反过来说更好一点)
  // 这个 BlockManagerMasterEndpoint 有点重要,Task 会将运行完结果存放的 Block 块信息发送给它 
  val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
    BlockManagerMaster.DRIVER_ENDPOINT_NAME,
    new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
    conf, isDriver)

  // 创建 BlockManager
  val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
    serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
    blockTransferService, securityManager, numUsableCores)

  // 创建 OutputCommitCoordinator
  val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
    new OutputCommitCoordinator(conf, isDriver)
  }
  // 注册 OutputCommitCoordinator 通信端
  val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
    new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
  outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)

  // 实例化 SparkEnv
  val envInstance = new SparkEnv(
    executorId,
    rpcEnv,
    serializer,
    closureSerializer,
    serializerManager,
    mapOutputTracker,
    shuffleManager,
    broadcastManager,
    blockManager,
    securityManager,
    metricsSystem,
    memoryManager,
    outputCommitCoordinator,
    conf)

  if (isDriver) {
    val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
    envInstance.driverTmpDir = Some(sparkFilesDir)
  }

  envInstance
}

由此可以看出,SparkEnv 维护着RpcEnv 和 一些 Manager,这些 Manager 会将通信端注册到 RpcEnv 中,以用于监听并处理节点间传递过来的消息。

RpcEnv

RpcEnv 是通信环境,负责 Spark 节点间的通信,并将节点间传递过来的消息转发给注册过来的监听器 (通信端)。

在实例化 SparkEnv 的时候会先创建 RpcEnv, SparkEnv.create() 方法中可以找到:

val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
  securityManager, clientMode = !isDriver)

RpcEnv.create() 的实现细节:

def create(
    name: String,
    bindAddress: String,
    advertiseAddress: String,
    port: Int,
    conf: SparkConf,
    securityManager: SecurityManager,
    clientMode: Boolean): RpcEnv = {  // clientMode 为是否是 Driver 端
 
  val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
    clientMode)
  // 这个版本的 Spark 使用 Netty 作为通信框架
  new NettyRpcEnvFactory().create(config)
}

在创建 SparkEnv 的时候,各个 Manager 会调用 registerOrLookupEndpoint() 方法进行通信端的注册,接下来我们看看 registerOrLookupEndpoin() 内部的实现细节:

def registerOrLookupEndpoint(
    name: String, endpointCreator: => RpcEndpoint):
  RpcEndpointRef = {

  if (isDriver) {
    // 在这里进行注册
    rpcEnv.setupEndpoint(name, endpointCreator)
  } else {
    RpcUtils.makeDriverRef(name, conf, rpcEnv)
  }
      
}

Spark 中使用 Netty 作为通信框架 (RpcEnv 唯一实现类), 我们看看 NettyRpcEnv.setupEndpoint 的内部实现细节:

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
  // dispatcher 是 Dispatcher 的实例化对象
  dispatcher.registerRpcEndpoint(name, endpoint)
}

RpcEnv 中有一个调度者( Dispatcher ),负责将消息传递给各个端点,接下来我们需要看看 Dispatcher. registerRpcEndpoint() 的实现细节:

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
  val addr = RpcEndpointAddress(nettyEnv.address, name)
  val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
  synchronized {
    // 略略略..
      
    val data = endpoints.get(name)
    // 使用 ConcurrentMap 存储注册过的通信端
    endpointRefs.put(data.endpoint, data.ref)
    receivers.offer(data)
  }
  endpointRef
}

我们看下 Dispatcher.postToAll() 方法,了解下 Dispatcher 是如何将 Rpc 消息转发给各个通信端的:

def postToAll(message: InboxMessage): Unit = {
  val iter = endpoints.keySet().iterator()
  // 将消息广播给所有监听者
  // 监听器模式
  while (iter.hasNext) {
    val name = iter.next
      // 大家可以看下内部实现细节
      postMessage(name, message, (e) => { //... }
    )}
}

各个通信端点在收到广播过来的消息后进行匹配,然后对这些消息进行相关的工作。

我这里以 BlockManagerMasterEndpoint 为例 (在 SparkEnv 的创建代码中可以找到),其它的通信端都是类似的。

消息类型分为需要应答和不需要应答两种(同步和异步),需要应答消息会被其 receiveAndReply() 方法接收,如果不应答就是 receive() 方法接收:

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    
  case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) => // ...

  case _updateBlockInfo @
      UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
     // .

  case GetLocations(blockId) => // ...

  case GetLocationsMultipleBlockIds(blockIds) => // ...

  // ...
    
}

后面的文章中会用到 SparkEnv,到时就体现它的作用了。

上一篇 下一篇

猜你喜欢

热点阅读