private[spark] def env: SparkEnv = _env
_env = createSparkEnv(_conf, isLocal, listenerBus)
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
new NettyRpcEnvFactory().create(config)
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
def create(config: RpcEnvConfig): RpcEnv = {
... ...
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf,
} catch {
... ...
-nettyEnv.startServer(config.bindAddress, actualPort)
def startServer(bindAddress: String, port: Int): Unit = {
... ...
server = transportContext.createServer(bindAddress, port, bootstraps)
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
RpcEndpoint 是用作做接受数据的receive*
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
RpcEndpointRef 是用来做发送的send or ask
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
注意的是RpcEndpoint 还有一个收件箱,
inbox message list概念
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
if (endpoints.containsKey(name)) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
// This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
// active when registering, and endpointRef must be put into endpointRefs before onStart is
// called.
endpointRefs.put(endpoint, endpointRef)
var messageLoop: MessageLoop = null
try {
messageLoop = endpoint match {
case e: IsolatedRpcEndpoint =>
new DedicatedMessageLoop(name, e, this)
case _ =>
sharedLoop.register(name, endpoint)
endpoints.put(name, messageLoop)
} catch {
case NonFatal(e) =>
throw e
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
//发件箱 根据RpcAddress会有多个
private val outboxes = new ConcurrentHashMapRpcAddress, Outbox
-transportContext.createServer(bindAddress, port, bootstraps)
-new TransportServer(this, host, port, rpcHandler, bootstraps)
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.option(ChannelOption.ALLOCATOR, pooledAllocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.childOption(ChannelOption.ALLOCATOR, pooledAllocator);
/** Returns the correct ServerSocketChannel class based on IOMode. */
public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
switch (mode) {
case NIO:
return NioServerSocketChannel.class;
case EPOLL:
//epoll方式模拟aio 异步io
return EpollServerSocketChannel.class;
throw new IllegalArgumentException("Unknown io mode: " + mode);
excutor的 网络通信
-val env = SparkEnv.createExecutorEnv
-val rpcEnv = RpcEnv.create
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
操作流程同driver ...