spark

spark通信-源码分析

2021-09-18  本文已影响0人  专职掏大粪

driver的网络通信

-SparkContext.SparkEnv
private[spark] def env: SparkEnv = _env
_env = createSparkEnv(_conf, isLocal, listenerBus)

val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,

new NettyRpcEnvFactory().create(config)

val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)


private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {

  def create(config: RpcEnvConfig): RpcEnv = {
  ... ...
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager, config.numUsableCores)
    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        //使用端口启动服务
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        ... ...
      }
    }
    nettyEnv
  }
}

-nettyEnv.startServer(config.bindAddress, actualPort)

def startServer(bindAddress: String, port: Int): Unit = {
   ...  ...
    server = transportContext.createServer(bindAddress, port, bootstraps)
   //注册通信终端
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }

RpcEndpoint 是用作做接受数据的receive*

def receive: PartialFunction[Any, Unit] = {
   case _ => throw new SparkException(self + " does not implement 'receive'")
 }

 def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
   case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
 }

RpcEndpointRef 是用来做发送的send or ask

 final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }

注意的是RpcEndpoint 还有一个收件箱,
inbox message list概念


def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      if (endpoints.containsKey(name)) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }

      // This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
      // active when registering, and endpointRef must be put into endpointRefs before onStart is
      // called.
      endpointRefs.put(endpoint, endpointRef)
     //收件箱
      var messageLoop: MessageLoop = null
      try {
        messageLoop = endpoint match {
          case e: IsolatedRpcEndpoint =>
            new DedicatedMessageLoop(name, e, this)
          case _ =>
            sharedLoop.register(name, endpoint)
            sharedLoop
        }
        endpoints.put(name, messageLoop)
      } catch {
        case NonFatal(e) =>
          endpointRefs.remove(endpoint)
          throw e
      }
    }
    endpointRef
  }

private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
//发件箱 根据RpcAddress会有多个
private val outboxes = new ConcurrentHashMapRpcAddress, Outbox

-transportContext.createServer(bindAddress, port, bootstraps)
-new TransportServer(this, host, port, rpcHandler, bootstraps)
//使用ipport启动服务
-TransportServer.init

bootstrap = new ServerBootstrap()
     .group(bossGroup, workerGroup)
     .channel(NettyUtils.getServerChannelClass(ioMode))
     .option(ChannelOption.ALLOCATOR, pooledAllocator)
     .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
     .childOption(ChannelOption.ALLOCATOR, pooledAllocator);

//NettyUtils
 /** Returns the correct ServerSocketChannel class based on IOMode. */
 public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
   switch (mode) {
     case NIO:
       return NioServerSocketChannel.class;
     case EPOLL:
       //epoll方式模拟aio 异步io
       return EpollServerSocketChannel.class;
     default:
       throw new IllegalArgumentException("Unknown io mode: " + mode);
   }
 }

excutor的 网络通信

CoarseGrainedExecutorBackend[object]
-val env = SparkEnv.createExecutorEnv
-SparkEnv.create
-val rpcEnv = RpcEnv.create

 env.rpcEnv.setupEndpoint("Executor",
        backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))

操作流程同driver ...

上一篇下一篇

猜你喜欢

热点阅读