Spark源码精读分析计划

Spark Core源码精读计划#8:SparkEnv中RPC环

2019-04-06  本文已影响37人  LittleMagic

目录

前言

在之前的文章中,我们由SparkContext的初始化提到了事件总线LiveListenerBus与执行环境SparkEnv。在讲解SparkEnv的过程中,RPC环境RpcEnv又是首先被初始化的重要组件。做个不怎么恰当的比较,SparkEnv之于SparkContext,正如RpcEnv之于SparkEnv。

由于RPC环境负责着Spark体系内几乎所有内部及外部通信,内容很多,所以一篇文章必然讲不完。本文还是从基础开始看起。

RPC端点及其引用

RpcEnv抽象类是Spark RPC环境的通用表示,它其中定义的setupEndpoint()方法用来向RPC环境注册一个RPC端点(RpcEndpoint),并返回其引用(RpcEndpointRef)。如果客户端要对一个RpcEndpoint发送消息,那么必须首先获得其对应的RpcEndpointRef。它们之间的关系可以用如下简图表示。


图#8.1 - RPC环境与RPC端点

既然RpcEndpoint和RpcEndpointRef是RPC环境中的基础组件,我们先来研究它们的源码。

RpcEndpoint

RpcEndpoint是一个特征,其代码如下。

代码#8.1 - o.a.s.rpc.RpcEndpoint特征

private[spark] trait RpcEndpoint {
  val rpcEnv: RpcEnv

  final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }

  def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }

  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
  }

  def onError(cause: Throwable): Unit = {
    throw cause
  }

  def onConnected(remoteAddress: RpcAddress): Unit = { }

  def onDisconnected(remoteAddress: RpcAddress): Unit = { }

  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { }

  def onStart(): Unit = { }

  def onStop(): Unit = { }

  final def stop(): Unit = {
    val _self = self
    if (_self != null) {
      rpcEnv.stop(_self)
    }
  }
}

其中定义了如下方法,这些相当于是RPC端点在RPC环境中的“行为准则”。

RpcEndpoint继承体系

RpcEndpoint的主要继承体系如下图所示。

#图8.2 - RpcEndpoint的主要继承体系

图中可以看到不少之前出现过的RPC端点,如文章#2中的HeartbeatReceiver,文章#7中的MapOutputTrackerMasterEndpoint、BlockManagerMasterEndpoint等。在今后涉及到它们时,会专门进行讲解。

另外,图中的ThreadSafeRpcEndpoint是直接继承自RpcEndpoint的特征。顾名思义,它要求RPC端点对消息的处理必须是线程安全的,用文档中的话说,线程安全RPC端点处理消息必须满足happens-before原则。

RpcEndpointRef

RpcEndpointRef是一个抽象类,其代码如下。

代码#8.2 - o.a.s.rpc.RpcEndpointRef抽象类

private[spark] abstract class RpcEndpointRef(conf: SparkConf)
  extends Serializable with Logging {
  private[this] val maxRetries = RpcUtils.numRetries(conf)
  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
  private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

  def address: RpcAddress

  def name: String

  def send(message: Any): Unit

  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

  def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)

  def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
    val future = ask[T](message, timeout)
    timeout.awaitResult(future)
  }
}

这个抽象类的开头有三个属性,都是通过RpcUtils工具类从Spark配置项中取出来的,如下。

值得注意的是,maxRetries与retryWaitMs两个属性在当前的2.3.3版本中都没有用到,而在之前的版本中还是有用到的,证明Spark官方取消了RPC重试机制,也就是统一为消息传递语义中的at most once语义了。当然,我们也可以自己实现带有重试机制的RPC端点引用。

address和name方法分别返回RPC端点引用对应的地址和名称,不必多讲。下面几个方法的含义如下。

RpcEndpointRef只有一个子类,即NettyRpcEndpointRef。它对send()和ask()两个方法的实现如下。

代码#8.3 - o.a.s.rpc.netty.NettyRpcEndPointRef.send()与ask()方法

  override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
    nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout)
  }

  override def send(message: Any): Unit = {
    require(message != null, "Message is null")
    nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
  }

可见是依赖于NettyRpcEnv类的,下面来看一下它是如何创建出来的。

NettyRpcEnv概况

创建NettyRpcEnv

在文章#7的代码#7.4~#7.5中,通过工厂类NettyRpcEnvFactory的create()方法创建出了NettyRpcEnv,它是目前Spark官方提供的RPC环境的唯一实现。该方法的代码如下。

代码#8.4 - o.a.s.rpc.netty.NettyRpcEnvFactory.create()方法

  def create(config: RpcEnvConfig): RpcEnv = {
    val sparkConf = config.conf
    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager, config.numUsableCores)
    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }
    nettyEnv
  }

可见,这个方法先创建了JavaSerializer序列化器,用于RPC传输中的序列化。然后通过NettyRpcEnv的构造方法创建NettyRpcEnv,这其中也会涉及到一些RPC基础组件的初始化,后面会讲解到。最后定义偏函数startNettyRpcEnv,并调用通用工具类Utils中的startServiceOnPort()方法来启动NettyRpcEnv。

NettyRpcEnv中的属性成员

我们暂时先不看NettyRpcEnv类的细节,而是先来看看它内部包含了哪些组件。

代码#8.5 - o.a.s.rpc.netty.NettyRpcEnv中的属性成员

  private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
    "rpc",
    conf.getInt("spark.rpc.io.threads", 0))

  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

  private val streamManager = new NettyStreamManager(this)

  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

  private val clientFactory = transportContext.createClientFactory(createClientBootstraps())

  @volatile private var fileDownloadFactory: TransportClientFactory = _

  val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")

  private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
    "netty-rpc-connection",
    conf.getInt("spark.rpc.connect.threads", 64))

  @volatile private var server: TransportServer = _

  private val stopped = new AtomicBoolean(false)

  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()

TransportConf和TransportContext提供底层的基于Netty的RPC机制,TransportClient和TransportServer则是RPC端点的最低级别抽象。

总结

本文讲解了RPC环境的基本组成部分RpcEndpoint、RpcEndpointRef的细节实现,并初步了解了NettyRpcEnv的创建过程,以及它内部包含的主要组件。虽然TransportConf和TransportContext更为基础,但为了避免嵌套太深出不来,下一篇文章暂时不准备讲它们,而主要来研究NettyRpcEnv内的调度器Dispatcher,它是整个RPC环境高效运转的基础。

上一篇下一篇

猜你喜欢

热点阅读