Spark

Spark源码分析之Master的启动流程

2019-06-13  本文已影响0人  叫我不矜持

准备

本文主要对Master的启动流程源码进行分析。Spark源码版本为2.3.1。

阅读源码首先从启动脚本入手,看看首先加载的是哪个类,我们看一下start-master.sh启动脚本中的具体内容。

脚本代码

可以看到这里加载的类是org.apache.spark.deploy.master.Master,好那我们的源码寻觅之旅就从这开始...

源码分析

打开源码,我们发现Master是伴生关系的一组类,我们直接定位到Master的main函数

//主方法
  def main(argStrings: Array[String]) {
    Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
      exitOnUncaughtException = false))
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    /**
      * 创建RPC 环境和Endpoint (RPC 远程过程调用),在Spark中 Driver, Master ,Worker角色都有各自的Endpoint,相当于各自的通信邮箱。
      *
      */
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }

发现该函数除了做了一些配置文件和args参数的准备之外,调用了startRpcEnvAndEndpoint函数,我们跟进去看看

/**
   * Start the Master and return a three tuple of:
   *   (1) The Master RpcEnv
   *   (2) The web UI bound port
   *   (3) The REST server bound port, if any
   */
  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)

    /**
      * 创建RPC(Remote Procedure Call )环境  ,Remote Procedure Call
      * 这里只是创建准备好Rpc的环境,后面会向RpcEnv中注册 角色【Driver,Master,Worker,Executor】
      */
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
   .....
  }

首先上面这段代码通过RpcEnv 构建了一个RPC通信环境,为之后的RPC通信做准备,Spark底层的通信是基于Netty的NIO模型,
这里每个Rpc端点运行时依赖的上下文环境称之为RpcEnv。

接下来我们直接跟踪到create方法中

def create(
      name: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      numUsableCores: Int,
      clientMode: Boolean): RpcEnv = {
    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
      numUsableCores, clientMode)
    //创建NettyRpc 环境
    new NettyRpcEnvFactory().create(config)
  }

可以看到,这里构建了NettyRpcEnvFactory来创建NettyRpc 环境,NettyRpcEnvFactory的create函数又做了那些事呢?

def create(config: RpcEnvConfig): RpcEnv = {
    val sparkConf = config.conf
    // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
    // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    /**
      * 创建nettyRPC通信环境。
      * 当new NettyRpcEnv时会做一些初始化:
      *   Dispatcher:这个对象中有存放消息的队列和消息的转发
      *   TransportContext:可以创建了NettyRpcHandler
      */
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager, config.numUsableCores)
    if (!config.clientMode) {
      //启动nettyRPCEnv
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        //以上  startNettyRpcEnv 匿名函数在此处会最终被调用,当匿名函数被调用时,重点方法是483行 nettyEnv.startServer 方法
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }
    nettyEnv
  }

可以看到上面首先构建了一个序列化的实例对象,然后开始着手构架nettyRPC通信环境。在new NettyRpcEnv()时会做一些初始化的工作,如下所示

 /**
    * dispatcher 这个对象中有消息队列和消息的循环获取转发
    */
  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

 /**
    * TransportContext 中会创建 NettyPpcHandler
    * TransportContext 这个对象中参数类型 RpcHandler  就是这里的 NettyRpcHandler
    */
  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

这里的Dispatcher是一个消息分发器,针对于RPC端点需要发送消息或者从远程RPC接收到的消息,分发至对应的指令收件箱/发件箱。

这里的transportContext 是构建传输的上下文环境,用于创建TransportServer和TransportClientFactory同时,通过TransportChannelHandler来设置Netty的Channel pipelines。

下面我们回到NettyRpcEnvFactory的create方法

val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager, config.numUsableCores)
    if (!config.clientMode) {
      //启动nettyRPCEnv
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        //以上  startNettyRpcEnv 匿名函数在此处会最终被调用,当匿名函数被调用时,重点方法是483行 nettyEnv.startServer 方法
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }

nettyEnv 准备好了之后,会构建一个函数startNettyRpcEnv,然后startServiceOnPort在会调用startNettyRpcEnv,进而调用nettyEnv的startServer函数,来启动Server

//启动Rpc 服务
  def startServer(bindAddress: String, port: Int): Unit = {
    val bootstraps: java.util.List[TransportServerBootstrap] =
      if (securityManager.isAuthenticationEnabled()) {
        java.util.Arrays.asList(new AuthServerBootstrap(transportConf, securityManager))
      } else {
        java.util.Collections.emptyList()
      }

    /**
      * transportContext已经被创建,这里createServer 就会绑定地址和端口,启动Netty Rpc 服务
      */
    server = transportContext.createServer(bindAddress, port, bootstraps)
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }

由于直接初始化时transportContext就已经被创建,这里createServer 就会绑定地址和端口,启动Netty Rpc 服务,createServer最在构建TransportServer实例时,会调用init初始化方法,在这里以前了解过Netty的同学就会非常熟悉了.

private void init(String hostToBind, int portToBind) {

    IOMode ioMode = IOMode.valueOf(conf.ioMode());
    EventLoopGroup bossGroup =
      NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
    EventLoopGroup workerGroup = bossGroup;

    PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
      conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());

    bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, allocator)
      .childOption(ChannelOption.ALLOCATOR, allocator);

    this.metrics = new NettyMemoryMetrics(
      allocator, conf.getModuleName() + "-server", conf);

    if (conf.backLog() > 0) {
      bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
    }

    if (conf.receiveBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
    }

    if (conf.sendBuf() > 0) {
      bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
    }

    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) {
        RpcHandler rpcHandler = appRpcHandler;
        for (TransportServerBootstrap bootstrap : bootstraps) {
          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
        }
        //初始化网络通信管道
        context.initializePipeline(ch, rpcHandler);
      }
    });

    InetSocketAddress address = hostToBind == null ?
        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
    channelFuture = bootstrap.bind(address);
    channelFuture.syncUninterruptibly();

    port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
    logger.debug("Shuffle server started on port: {}", port);
}

init 方法就是去初始化一个绑定的主机和端口,创建nettyRPC通信,通过之前的rpcHandler来初始化网络通信管道,同时会调用createChannelHandler函数,创建处理消息的 channelHandler用于处理客户端请求消息和服务端回应消息

private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler, conf.maxChunksBeingTransferred());
    /**
     *   由以上  responseHandler   client    requestHandler  三个handler构建 TransportChannelHandler
     *   new TransportChannelHandler 这个对象中有 【channelRead() 方法】,用于读取接收到的消息
     */
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), closeIdleConnections);
  }

最终Rpc的环境就准备好了,后面会向RpcEnv中注册 角色 Driver,Master,Worker,Executor。我们回到Master的startRpcEnvAndEndpoint函数

 /**
      * 向RpcEnv 中 注册Master
      *
      * rpcEnv.setupEndpoint(name,new Master)
      * 这里new Master 的Master 是一个伴生类,继承了 ThreadSafeRpcEndpoint,归根结底继承到了 Trait 接口  RpcEndpoint
    val masterEndpoint: RpcEndpointRef = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)

这里会向RpcEnv中注册Master,调用了setupEndpoint函数,传入了Master的实例对象,这里的Master 是一个伴生类,继承了 ThreadSafeRpcEndpoint,也是RpcEndpoint的实现类,而什么是RpcEndpoint?

RpcEndpoint:RPC端点 ,Spark针对于每个节点(Client/Master/Worker)都称之一个Rpc端点 ,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher。

EndPoint中存在

同时Endpoint 还有各自的引用,方便其他Endpoint发送消息,直接引用对方的EndpointRef 即可找到对方的Endpoint,上面源码中的masterEndpoint 就是Master的Endpoint引用 RpcEndpointRef 。

RpcEndpointRef中存在

接下来我们看看NettyRpcEnv中的setupEndpoint函数

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
    dispatcher.registerRpcEndpoint(name, endpoint)
  }

上面直接调用了dispatcher实例,注册RpcEndpoint

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      //这里 new EndpointData使用到了endpoint,当new Inbox 时向消息队列中放入OnStart样例类标识
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      //获取刚刚封装的EndPointData
      val data: EndpointData = endpoints.get(name)
      endpointRefs.put(data.endpoint, data.ref)

      /**
        * receivers 这个消息队列中放着应该去哪个Endpoint 中获取Message 处理
        * 这里其实就是进入 Dispatcher 当前这个类中的 MessageLoop 方法。这个方法当new Dispatcher后会一直运行。
        * 将消息放入待处理的消息队列中,消息首先找到对应的Endpoint ,再会获取当前Endpoint的Inbox 中message,使用process 方法处理
        */
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef
  }

上面的new EndpointData使用到了endpoint,EndpointData其实就是对endpoint和endpointRef的封装,同时内部还构建了Inbox

private class EndpointData(
      val name: String,
      val endpoint: RpcEndpoint,
      val ref: NettyRpcEndpointRef) {
    //将endpoint封装到Inbox中
    val inbox = new Inbox(ref, endpoint)
  }

这里的Inbox是其实就是消息收件箱,一个本地端点对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部待Receiver Queue中,另外Dispatcher创建时会启动一个单独线程进行轮询Receiver Queue,进行收件箱消息消费。

其实Inbox实例在构建过程中也会有消息的存入。

private[netty] class Inbox(
    val endpointRef: NettyRpcEndpointRef,
    val endpoint: RpcEndpoint)
  extends Logging {
....
@GuardedBy("this")
  protected val messages = new java.util.LinkedList[InboxMessage]()

  // OnStart should be the first message to process
  //当注册endpoint时都会调用这个异步方法,messags中放入一个OnStart样例类消息对象
  inbox.synchronized {
    messages.add(OnStart)
  }
....

可以看到,构建inbox时会向messages中加入一个OnStart消息,该消息会被inbox类中的process所消费,只不过触发时机还在后面。

回到Dispatcher的registerRpcEndpoint函数

      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      //获取刚刚封装的EndPointData
      val data: EndpointData = endpoints.get(name)
      endpointRefs.put(data.endpoint, data.ref)

      /**
        * receivers 这个消息队列中放着应该去哪个Endpoint 中获取Message 处理
        * 这里其实就是进入 Dispatcher 当前这个类中的 MessageLoop 方法。这个方法当new Dispatcher后会一直运行。
        * 将消息放入待处理的消息队列中,消息首先找到对应的Endpoint ,再会获取当前Endpoint的Inbox 中message,使用process 方法处理
        */
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef

这里发现刚才构建的EndpointData会被放入到endpoints中,endpoints和endpointRefs的类型都是ConcurrentHashMap,对这两个容器进行相应的操作之后,就会调用receivers.offer(data)。

receivers是Dispatcher中的一个消息队列,这个消息队列还有一个线程池来进行消息的消费,整个模式构成一个生产者消费者模式,因此这里将data消息加入到消息队列中,会触发线程消费。

  private val threadpool: ThreadPoolExecutor = {
    val availableCores =
      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
    val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
      math.max(2, availableCores))
    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
    for (i <- 0 until numThreads) {
      pool.execute(new MessageLoop)
    }
    pool
  }

/** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            //take 出来消息一直处理
            val data: EndpointData = receivers.take()
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            //调用process 方法处理消息
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

可以看到上面的MessageLoop中,通过消息队列receivers取出一条消息,该消息的类型为EndpointData,内部都封装了inbox实例,因此直接调用该实例的process函数去处理消息,我们之前在inbox实例构建过程中,发现会向inbox内部的消息容器中放入一条onStart消息,因此我们看一下process函数是如何处理该消息的。

def process(dispatcher: Dispatcher): Unit = {

case OnStart =>
            //调用Endpoint 的onStart函数
            endpoint.onStart()
            if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
              inbox.synchronized {
                if (!stopped) {
                  enableConcurrent = true
                }
              }
            }
}

process中对onStart的处理为,调用endpoint的onStart函数,而endpoint我们还记得是啥吗,这里是我们直接构建的Master实例,因此我们回到Master中去看看onStart()函数的处理过程。

override def onStart(): Unit = {
.....
 val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
      case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        val zkFactory =
          new ZooKeeperRecoveryModeFactory(conf, serializer)
        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
      case "FILESYSTEM" =>
        val fsFactory =
          new FileSystemRecoveryModeFactory(conf, serializer)
        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
      case "CUSTOM" =>
        val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
        val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
          .newInstance(conf, serializer)
          .asInstanceOf[StandaloneRecoveryModeFactory]
        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
      case _ =>
        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
    }
    persistenceEngine = persistenceEngine_
    leaderElectionAgent = leaderElectionAgent_
}

onStart函数涉及到webUI的启动,applicationMetricsSystem和masterMetricsSystem的启动过程,如果设置了restServer还涉及到启动过程。

同时Master的onStart函数的部分代码如上所示,我么可以看到,这里涉及到如何在HA的环境下选主的过程。内部会根据配置决定采用哪种方式,是ZOOKEEPER还是文件系统的方式来进行。

生产环境下一般采用Zookeeper做HA,Zookeeper会自动化管理 Master的切换;

采用Zookeeper做HA的时候,Zookeeper会负责保存整个Spark集群运行时候的元数据:workers、Drivers、Applications、Executors;

Zookeeper遇到当前Active级别的Master出现故障的时候会从StandbyMaster中选取一台作为Active Master,但是要注意,被选举后到成为真正的ActiveMaster之间需要从Zookeeper中获取集群当前运行状态的元数据信息并进行恢复;

在Master切换的过程中,所有的已经在运行的程序皆正常运行!因为Spark Application在运行前就已经通过ClusterManager获得了计算资源,所以在运行时Job本身的调度和处理和Master是没有任何关系的!

在Master的切换过程中唯一的影响是不能提交新的job:一方面不能提交新的应用程序给集群,因为只有ActiveMaster才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因为Action操作触发新的Job的提交请求;

上一篇下一篇

猜你喜欢

热点阅读