Spark Core源码精读计划#8:SparkEnv中RPC环
目录
前言
在之前的文章中,我们由SparkContext的初始化提到了事件总线LiveListenerBus与执行环境SparkEnv。在讲解SparkEnv的过程中,RPC环境RpcEnv又是首先被初始化的重要组件。做个不怎么恰当的比较,SparkEnv之于SparkContext,正如RpcEnv之于SparkEnv。
由于RPC环境负责着Spark体系内几乎所有内部及外部通信,内容很多,所以一篇文章必然讲不完。本文还是从基础开始看起。
RPC端点及其引用
RpcEnv抽象类是Spark RPC环境的通用表示,它其中定义的setupEndpoint()方法用来向RPC环境注册一个RPC端点(RpcEndpoint),并返回其引用(RpcEndpointRef)。如果客户端要对一个RpcEndpoint发送消息,那么必须首先获得其对应的RpcEndpointRef。它们之间的关系可以用如下简图表示。
图#8.1 - RPC环境与RPC端点
既然RpcEndpoint和RpcEndpointRef是RPC环境中的基础组件,我们先来研究它们的源码。
RpcEndpoint
RpcEndpoint是一个特征,其代码如下。
代码#8.1 - o.a.s.rpc.RpcEndpoint特征
private[spark] trait RpcEndpoint {
val rpcEnv: RpcEnv
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}
def onError(cause: Throwable): Unit = {
throw cause
}
def onConnected(remoteAddress: RpcAddress): Unit = { }
def onDisconnected(remoteAddress: RpcAddress): Unit = { }
def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { }
def onStart(): Unit = { }
def onStop(): Unit = { }
final def stop(): Unit = {
val _self = self
if (_self != null) {
rpcEnv.stop(_self)
}
}
}
其中定义了如下方法,这些相当于是RPC端点在RPC环境中的“行为准则”。
- self():取得当前RpcEndpoint对应的RpcEndpointRef。
- receive()/receiveAndReply():接收其他RpcEndpointRef传来的消息并进行处理,receiveAndReply()方法还会发送回复。
- onError():消息处理出现异常时调用的方法。
- onConnected()/onDisconnected():当前RPC端点建立连接或断开连接时调用的方法。
- onNetworkError():RPC端点的连接出现网络错误时调用的方法。
- onStart()/onStop():RPC端点初始化与关闭时调用的方法。
- stop():停止当前RpcEndpoint。
RpcEndpoint继承体系
RpcEndpoint的主要继承体系如下图所示。
#图8.2 - RpcEndpoint的主要继承体系图中可以看到不少之前出现过的RPC端点,如文章#2中的HeartbeatReceiver,文章#7中的MapOutputTrackerMasterEndpoint、BlockManagerMasterEndpoint等。在今后涉及到它们时,会专门进行讲解。
另外,图中的ThreadSafeRpcEndpoint是直接继承自RpcEndpoint的特征。顾名思义,它要求RPC端点对消息的处理必须是线程安全的,用文档中的话说,线程安全RPC端点处理消息必须满足happens-before原则。
RpcEndpointRef
RpcEndpointRef是一个抽象类,其代码如下。
代码#8.2 - o.a.s.rpc.RpcEndpointRef抽象类
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {
private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
def address: RpcAddress
def name: String
def send(message: Any): Unit
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
val future = ask[T](message, timeout)
timeout.awaitResult(future)
}
}
这个抽象类的开头有三个属性,都是通过RpcUtils工具类从Spark配置项中取出来的,如下。
- maxRetries:最大重连次数,对应配置项为spark.rpc.numRetries,默认值3次。
- retryWaitMs:每次重连之前等待的时长,对应配置项为spark.rpc.retry.wait,默认值3秒。
- defaultAskTimeout:对RPC端点进行ask()操作(下面会讲到)的默认超时时长,对应配置项为spark.rpc.askTimeout与spark.network.timeout(前者优先级高于后者),默认值120秒。
值得注意的是,maxRetries与retryWaitMs两个属性在当前的2.3.3版本中都没有用到,而在之前的版本中还是有用到的,证明Spark官方取消了RPC重试机制,也就是统一为消息传递语义中的at most once语义了。当然,我们也可以自己实现带有重试机制的RPC端点引用。
address和name方法分别返回RPC端点引用对应的地址和名称,不必多讲。下面几个方法的含义如下。
- send()方法:异步发送一条单向的消息,并且“发送即忘记”(fire-and-forget),不需要回复。
- ask()方法:异步发送一条消息,并在规定的超时时间内等待RPC端点的回复。RPC端点会调用receiveAndReply()方法来处理。
- askSync()方法:是ask()方法的同步实现。由于它是阻塞操作,有可能会消耗大量时间,因此必须慎用。
RpcEndpointRef只有一个子类,即NettyRpcEndpointRef。它对send()和ask()两个方法的实现如下。
代码#8.3 - o.a.s.rpc.netty.NettyRpcEndPointRef.send()与ask()方法
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout)
}
override def send(message: Any): Unit = {
require(message != null, "Message is null")
nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
}
可见是依赖于NettyRpcEnv类的,下面来看一下它是如何创建出来的。
NettyRpcEnv概况
创建NettyRpcEnv
在文章#7的代码#7.4~#7.5中,通过工厂类NettyRpcEnvFactory的create()方法创建出了NettyRpcEnv,它是目前Spark官方提供的RPC环境的唯一实现。该方法的代码如下。
代码#8.4 - o.a.s.rpc.netty.NettyRpcEnvFactory.create()方法
def create(config: RpcEnvConfig): RpcEnv = {
val sparkConf = config.conf
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv
}
可见,这个方法先创建了JavaSerializer序列化器,用于RPC传输中的序列化。然后通过NettyRpcEnv的构造方法创建NettyRpcEnv,这其中也会涉及到一些RPC基础组件的初始化,后面会讲解到。最后定义偏函数startNettyRpcEnv,并调用通用工具类Utils中的startServiceOnPort()方法来启动NettyRpcEnv。
NettyRpcEnv中的属性成员
我们暂时先不看NettyRpcEnv类的细节,而是先来看看它内部包含了哪些组件。
代码#8.5 - o.a.s.rpc.netty.NettyRpcEnv中的属性成员
private[netty] val transportConf = SparkTransportConf.fromSparkConf(
conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
"rpc",
conf.getInt("spark.rpc.io.threads", 0))
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val streamManager = new NettyStreamManager(this)
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
@volatile private var fileDownloadFactory: TransportClientFactory = _
val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")
private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
"netty-rpc-connection",
conf.getInt("spark.rpc.connect.threads", 64))
@volatile private var server: TransportServer = _
private val stopped = new AtomicBoolean(false)
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
- TransportConf:传输配置,作用在RPC环境中类似于SparkConf,负责管理与RPC相关的各种配置。
- Dispatcher:调度器,或者叫分发器,用于将消息路由到正确的RPC端点。
- NettyStreamManager:流式管理器,用于处理RPC环境中的文件,如自定义的配置文件或者JAR包。
- TransportContext:传输上下文,作用在RPC环境中类似于SparkContext,负责管理RPC的服务端(TransportServer)与客户端(TransportClient),与它们之间的Netty传输管道。
- TransportClientFactory:创建RPC客户端TransportClient的工厂。
- TransportServer:RPC环境中的服务端,负责提供基础且高效的流式服务。
TransportConf和TransportContext提供底层的基于Netty的RPC机制,TransportClient和TransportServer则是RPC端点的最低级别抽象。
总结
本文讲解了RPC环境的基本组成部分RpcEndpoint、RpcEndpointRef的细节实现,并初步了解了NettyRpcEnv的创建过程,以及它内部包含的主要组件。虽然TransportConf和TransportContext更为基础,但为了避免嵌套太深出不来,下一篇文章暂时不准备讲它们,而主要来研究NettyRpcEnv内的调度器Dispatcher,它是整个RPC环境高效运转的基础。