Spark

Spark RPC 通信机制

2018-07-21  本文已影响0人  wangdy12

相关概念

主要涉及RpcEnv,RpcEndpoint,RpcEndpointRef,其中RpcEnv是通信的基础,每个通信节点上都需要实现该类,其内部实现了消息的传输处理机制,RpcEndpoint表示一个可以接收RPC消息的对象,远程节点通过RpcEndpointRef向相应的RpcEndpoint发送消息

RpcEnv

RpcEnv 抽象类表示一个 RPC Environment,管理着整个RpcEndpoint的生命周期,目前唯一的实现类是NettyRpcEnv,具体功能是

RpcEnv会在所有的通信节点上创建,例如master,worker,driver,executor都会创建一个RpcEnv

Driver端上的RpcEnv在SparkEnv初始化时创建:

    val systemName = if (isDriver) driverSystemName else executorSystemName
    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
      securityManager, numUsableCores, !isDriver)

RpcEnv.create内部通过工厂方法创建RpcEnv,具体是通过实现RpcEnvFactory接口的NettyRpcEnvFactory工厂类,创建RpcEnv的具体实现类NettyRpcEnv

  //指明该RpcEnv的名称,监听地址和端口
  def create(
      name: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      conf: SparkConf,
      securityManager: SecurityManager,
      numUsableCores: Int,
      clientMode: Boolean): RpcEnv = {
    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
      numUsableCores, clientMode)
    new NettyRpcEnvFactory().create(config)
  }

RpcEndpoint

RpcEndPoint 代表具体的通信节点,例如Master、Worker、CoarseGrainedSchedulerBackend中的DriverEndpoint、CoarseGrainedExecutorBackend等,都实现了该接口,在具体的函数中定义了消息传递来时的处理逻辑,整个生命周期是constructor -> onStart -> receive* -> onStop,即调用构造函数,然后向RpcEnv注册,内部调用onStart,之后如果收到消息,RpcEnv会调用receive*方法,结束时调用onStop方法

private[spark] trait RpcEndpoint {
  // 当前RpcEndpoint注册的RpcEnv
  val rpcEnv: RpcEnv
  // 获取该RpcEndpoint对应的RpcEndpointRef
  final def self: RpcEndpointRef

  // 处理RpcEndpointRef.send发送的消息
  def receive: PartialFunction[Any, Unit]
  // 处理RpcEndpointRef.ask发送的消息,通过RpcCallContext返回消息或异常
  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]

  //一系列的回调函数
  def onError(cause: Throwable): Unit
  def onConnected(remoteAddress: RpcAddress): Unit
  def onDisconnected(remoteAddress: RpcAddress): Unit
  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit
  def onStart(): Unit
  def onStop(): Unit

  // 停止RpcEndpoint
  final def stop(): Unit 
}

它的子类是ThreadSafeRpcEndpoint,Spark中实现的Endpoint大多是继承这个类,应该线程安全的处理消息,即RpcEnv中的Dispatcher在处理该Endpoint对应的Inbox内的消息时,只能单线程处理消息,不能进行多线程同时处理多个消息

RpcEndpointRef

RpcEndPointRef 是对远程RpcEndpoint的一个引用,内部记录了RpcEndpoint的位置信息

private[spark] abstract class RpcEndpointRef(conf: SparkConf)
  extends Serializable with Logging {
  // 最大重连次数(3),重新尝试的等待事件(3s),默认的超时事件(120s)
  private[this] val maxRetries = RpcUtils.numRetries(conf)
  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
  private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

  // 对应RpcEndpoint的地址,名称
  def address: RpcAddress
  def name: String

  // 发送一个消息
  def send(message: Any): Unit
  // 发送消息到相应的`RpcEndpoint.receiveAndReply`,异步
  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
  // 发送消息到相应的`RpcEndpoint.receiveAndReply`,阻塞等待回复的结果
  def askSync[T: ClassTag](message: Any): T
  ....
}

地址表示


NettyRpcEnv实现

NettyRpcEnvFactory

NettyRpcEnv

内部涉及的部分字段和函数如下:

private[netty] class NettyRpcEnv(
    val conf: SparkConf,
    //JavaSerializerInstance可以在多线性情况下运行
    javaSerializerInstance: JavaSerializerInstance,
    host: String,
    securityManager: SecurityManager,
    numUsableCores: Int) extends RpcEnv(conf) with Logging {

  private[netty] val transportConf : TransportConf 传输上下文的配置信息,其中默认的netty线程数目为8
  private val dispatcher: Dispatcher 消息分发器,负责将RPC消息发送到对应的endpoint
  private val streamManager : NettyStreamManager 用于文件传输
  private val transportContext : TransportContext 传输的核心,用来创建TransportServer和TransportClientFactory
  private val clientFactory : TransportClientFactory 用来创建TransportClient
  @volatile private var fileDownloadFactory: TransportClientFactory 用来创建用于file下载的TransportClient,使得与主RPC传输分离
  val timeoutScheduler : ScheduledThreadPoolExecutor 线程池,超时控制相关
  private[netty] val clientConnectionExecutor:ThreadPoolExecutor 客户端连接线程池,线程池默认最大线程数目64
  @volatile private var server: TransportServer
  // 向远程RpcAddress发送消息时,将消息放到相应的Outbox中即可
  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()

  // 在特定地址端口上启动服务 即创建一个TransportServer
  def startServer(bindAddress: String, port: Int): Unit
  //该RpcEnv监听的地址 地址+端口
  def address: RpcAddress
  //注册RpcEndpoint,返回RpcEndpointRef
  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
  //检索出对应的RpcEndpointRef
  def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef
  // 获取RpcEndpoint对应的RpcEndpointRef
  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
  // 等待RpcEnv退出
  def awaitTermination(): Unit
}

Dispatcher

进行消息的异步处理,内部有一个线程池,每个线程执行MessageLoop任务,不停将放置在阻塞队列中receivers中的EndpointData消息取出分发到相应的endpoint,如果为PoisonPill消息,关闭线程池

其内部记录了该节点上所有的RpcEndpoint

  private val endpoints: ConcurrentMap[String, EndpointData]
  private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef]
  // 存储EndpointData数据的阻塞队列
  private val receivers = new LinkedBlockingQueue[EndpointData]
  // 创建一个线程名前缀名称为dispatcher-event-loop的线程池,默认线程数目是JVM可获取的核数与2的最大值,用来处理消息InboxMessage
  private val threadpool: ThreadPoolExecutor

  // 注册RpcEndpoint 将相关信息添加到endpoints和endpointRefs,receivers集合中
  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint)
  // 是否存在该endpoint
  def verify(name: String): Boolean

  // 内部的线程池,如果没有指定线程数目,使用核数作为线程数目
  private val threadpool: ThreadPoolExecutor = {
    val availableCores =
      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
    val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
      math.max(2, availableCores))
    val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
    for (i <- 0 until numThreads) {
      pool.execute(new MessageLoop)
    }
    pool
  }

EndpointData

每个endpoint都有一个对应的EndpointDataEndpointData内部包含了RpcEndpointNettyRpcEndpointRef信息,与一个Inbox,收信箱Inbox内部有一个InboxMessage链表,发送到该endpoint的消息,就是添加到该链表,同时将整个EndpointData添加Dispatcher到阻塞队列receivers中,由Dispatcher线程异步处理

InboxMessageInbox内的消息,所有的RPC消息都继承自InboxMessage

注册RpcEndpoint

NettyRpcEnv内注册RpcEndpoint

  override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
    dispatcher.registerRpcEndpoint(name, endpoint)
  }

Dispatcher.registerRpcEndpoint调用:

RpcCallContext

当发送的消息类型是RpcMessage时,需要回复消息,需要在其中封装NettyRpcCallContext,用来向客户端发送消息

private[spark] trait RpcCallContext {
  // 回复消息给发送方
  def reply(response: Any): Unit
  // 回复失败给发送方
  def sendFailure(e: Throwable): Unit
  // 获取发送方地址
  def senderAddress: RpcAddress
}

NettyRpcCallContext为实现RpcCallContext接口的抽象类,有两个具体的实现类


TransportContext

传输上下文TransportContext,内部包含传输配置信息TransportConf,以及对收到的RPC消息进行处理的RpcHandler,用来创建TransportServer和TransportClientFactory,底层依赖Netty实现

Netty中的相关概念:
每个Channel都有一个 ChannelPipeline,在Channel创建时会被自动创建

  • ChannelPipeline:内部有一个由ChannelHandlerContext组成的双向链表,每个ChannelHandlerContext对应一个ChannelHandler
  • ChannelHandler:处理I/O事件或拦截I/O操作,并将其转发到ChannelPipeline中的下一个ChannelHandler,子接口ChannelOutboundHandlerChannelInboundHandler分别用于处理发送和接收的I/O

TransportConf

传输上下文的配置信息,使用SparkTransportConf.fromSparkConf方法来构造

内部实际使用的是一份克隆的SparkConf存储配置属性,默认分配给网络传输的IO线程数是系统可用处理器的数量,但线程数目最多为8,最终确定的线程数将被用于设置客户端传输线程数(spark.$module.io.clientThreads)和服务端传输线程数(spark.$module.io.serverThreads),此外还将spark.rpc.io.numConnectionsPerPeer属性设置为1

内部包含大量io相关的配置属性,及其默认值,例如IO模式,缓存大小,线程数目等,属性名称为"spark." + 模块名称 + "." + 后缀,其中模块名称为rpc,在NettyRpcEnv中创建TransportConf时指定

TransportClientFactory

TransportContext.createClientFactory方法创建,是用来创建TransportClient的工厂类,内部包含一个连接池ConcurrentHashMap<SocketAddress, ClientPool> connectionPool,进行缓存,方便重复使用

连接池中每个SocketAddress对应一个客户端池ClientPool,其内有一个TransportClient数组,数组大大小由spark.rpc.io.numConnectionsPerPeer指定,即本节点和远程节点建立的连接数目,默认为1

TransportClientFactory构造函数中包含内部Netty客户端相关的配置,具体类型取决于ioMode:NIO或者EPOLL(Java NIO 在Linux下默认使用的就是epoll)

每个TransportClient和一个远程地址通信,由TransportClientFactory创建,流程如下

image

Channel中处理 I/O 事件的ChanelHandler核心是TransportChannelHandler,此外还有编解码相关和空闲状态检查相关的handler

TransportClientBootstrap

TransportClientFactory创建Client成功连接到远程服务端以后,先执行引导程序,主要用来进行初始信息交互,例如SaslClientBootstrap进行SASL认证,完成后才会返回该新建的TransportClient

TransportClient

TransportClient内部包含一个通道Channel,以及一个TransportResponseHandler,此类用于向服务器发出请求,而TransportResponseHandler负责处理来自服务器的响应,是线程安全的,可以从多个线程调用

client用来发送五种RequestMessage:ChunkFetchRequest、OneWayMessage、RpcRequest、StreamRequest、UploadStream

TransportServerBootstrap

当客户端和服务器建立连接后,在服务端对应的管道上运行的引导程序

TransportServer

RPC框架的服务端,只要RpcEnvConfig.clientMode不为ture,都会启动服务

调用TransportServer.startServer启动服务,通过TransportContext.createServer创建服务端,然后内部会向NettyRpcEnvdispatcher中注册本身的RpcEndpoint,名称为endpoint-verifier,类型为RpcEndpointVerifier

它的作用是,当远程节点需要创建该RpcEnv上的Endpoint的一个引用时(setupEndpointRef方法),因为每个RpcEnv上都有RpcEndpointVerifier,所以远端可以直接创建一个RpcEndpointVerifier对应的ref,通过它发送CheckExistence(name: String)消息,查询该dispatcher内部的endpoints缓存中是否存在的名称为name的endpoint,从而确定是否可以创建该RpcEndpointRef

TransportServer内部包含的变量如下:

其创建过程就是标准的netty服务端创建方式,对于已经链接进来的client Chanel,ChanelHandler的配置和客户端配置类似

RpcHandler

TransportClient.sendRpc发送的RPC消息进行处理,内部通过Dispatcher将收到的RPC分发到对应的Endpoint

// RpcHandler部分定义
public abstract class RpcHandler {
  // 接收一个RPC消息,具体逻辑执行由子类实现,处理完成后通过RpcResponseCallback回调,如果不需要回调返回消息的,传入参数为OneWayRpcCallback,只打印日志
  public abstract void receive(
      TransportClient client,
      ByteBuffer message,
      RpcResponseCallback callback);

  // 获取StreamManager
  public abstract StreamManager getStreamManager();
  // Channel处于活跃状态时调用
  public void channelActive(TransportClient client) { }
  // 非活跃状态时调用
  public void channelInactive(TransportClient client) { }
  // 产生异常时调用
  public void exceptionCaught(Throwable cause, TransportClient client) { }
  ...
}

NettyStreamManager用于提供NettyRpcEnv的文件流服务,可以将文件,目录和jar包注册到其中,然后根据请求,将相应文件的信息封装为FileSegmentManagedBuffer,可以用来处理StreamRequest类型的消息

管道初始化

管道初始化过程中都使用了TransportContext.initializePipeline创建的TransportChannelHandler

TransportChannelHandler是单个传输层的通道handler,用于将请求委派给TransportRequestHandler并响应TransportResponseHandler。在传输层中创建的所有通道都是双向的。当客户端使用RequestMessage发送给Netty通道(由服务器的RequestHandler处理)时,服务器将生成ResponseMessage(由客户端的ResponseHandler处理)。但是,服务器也会在同一个Channel上获取句柄,因此它可能会向客户端发送RequestMessages。这意味着客户端还需要一个RequestHandler,而Server需要一个ResponseHandler,用于客户端对服务器请求的响应进行响应。

此类还处理来自io.netty.handler.timeout.IdleStateHandler的超时信息。如果存在未完成的提取fetch或RPC请求但是在“requestTimeoutMs”时间内通道上没有的流量,我们认为连接超时。注意这是双工通道;如果客户端不断发送但是没有响应,为简单起见不认为是超时

TransportChannelHandler内部使用MessageHandler处理Message,其中MessageHandler有两种类型,分别用来处理客户端请求/处理服务端的响应,Message共有10种类型

TransportRequestHandler

处理客户端的五种请求信息RequestMessage,内部包含RpcHandler处理RPC信息,TransportClient用来和请求方通信

SaslServerBootstrap类型的引导首先会对服务器端的RPCHandler进行代理,与客户端进行认证交互,认证成功后,将加解密的Handler添加到通道的Pipeline中,后续的消息交给被代理的RPCHandler进行代理

TransportResponseHandler

处理服务端对请求的响应,内部会记录需要回复的请求的ID,以及对应的callback函数,一共由六种ResponseMessage,根据消息类型和ID,执行响应的回调

其他ChannelHandler

length(long类型,消息长度,8字节)|message type(消息类型,1个字节)|message meta(消息元数据,例如RpcRequest消息的元数据长度为12,包括requestId和消息体长度bodysize)|message body(消息体,具体的信息,例如ChunkFetchSuccess中的块数据)

Outbox

可以理解为发出消息的盒子,每个地址对应个盒子

NettyRpcEnv中outboxes : ConcurrentHashMap[RpcAddress, Outbox]字段,每个远程RpcAddress对应一个Outbox,OutBox其内部包含一个OutboxMessage的链表,所有向远端发送的消息都要封装为OutboxMessage

调用Outbox.send方法发送消息时,将消息添加到OutboxMessage链表中,如果远程连接还未建立,会先通过NettyRpcEnv中的clientConnectionExecutor线程池执行建立连接的任务,即创建特定RpcAddress上的TransportClient,然后发送消息

OutboxMessage有两个子类,OneWayOutboxMessageRpcOutboxMessage,表明不会有回复和存在回复两种消息类型,分别对应调用RpcEndpoint的receivereceiveAndReply方法,当TransportClient发送消息时,如果Message是RpcOutboxMessage,先会创建一个UUID,底层TransportResponseHandler维护一个发送消息ID与其Callback的HashMap,当Netty收到完整的远程RpcResponse时候,做反序列化,回调相应的Callback,进而执行Spark中的业务逻辑,即Promise/Future的响应

Rpc底层通信框架

实际流程分析

当通过RpcEndpointRef发送需要回复的消息时:

如果远程地址与当前NettyRpcEnv相同:

如果需要连接远程地址时:

上述步骤都是异步执行的,当将消息放置到相应位置后,就会返回,然后:

在Spark 1.6之前,底层使用的Akka进行PRC,它是基于Actor的RPC通信系统,但是无法适用大的package/stream的数据传输,所以还有Netty通信框架,所以将两套通信框架合并统一使用netty,并且akka使用时版本必须保证一致,否则会出现很多问题。但是RpcEnv参照了Akka的思路,内部原理基本一致,都是按照MailBox的设计思路来实现的

image

参考

其他Spark源码分析,记录在GitBook

上一篇下一篇

猜你喜欢

热点阅读