Spark源码精读分析计划spark

Spark Core源码精读计划 番外篇B-1:重回Spark

2019-11-11  本文已影响0人  LittleMagic

目录

前言

又是很久没有连载,万分抱歉。今天(注:其实也包含昨天)需要盯着双11各个实时任务的运行,目前仍然无事发生,抽空来写几笔吧。

在前面的文章中,我们了解了块管理器BlockManager管理下的读写流程。并且已经知道,BlockManager读取块时,如果块在本地找不到,就会去集群内的远端节点去获取。同理,如果BlockManager写入块时需要复制,那么除了在本地写入之外,也要再写一份到远端节点。BlockManager与远端节点的交互就得依赖块传输服务BlockTransferService。但是BlockTransferService需要依赖之前偷懒没有讲过的RPC底层组件,所以现在得把这个坑填上,计划用3篇文章来填。

由于Spark 2.x的RPC环境是完全基于Netty搞的,所以如果看官对Netty有基本的了解的话,读起来会顺畅一点。

RPC底层概览

在系列的文章#8中,我们讲到了RPC环境——即NettyRpcEnv的构建和属性成员。来复习一下。

代码#B1.1 - o.a.s.rpc.netty.NettyRpcEnv中的部分属性成员

  private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
    "rpc",
    conf.getInt("spark.rpc.io.threads", 0))

  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

  private val streamManager = new NettyStreamManager(this)

  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

  private val clientFactory = transportContext.createClientFactory(createClientBootstraps())

  @volatile private var server: TransportServer = _

这些东西就是Spark RPC底层主要的组成部分,之前并没有了解过,下面我们从TransportConf、TransportContext这两样开始探究。

传输配置TransportConf

在Spark源码中并不会显式地创建TransportConf实例,而是通过SparkTransportConf对象代为实现。该对象的源码很短,如下。

代码#B1.2 - o.a.s.network.netty.SparkTransportConf对象

object SparkTransportConf {
  private val MAX_DEFAULT_NETTY_THREADS = 8

  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
    val conf = _conf.clone

    val numThreads = defaultNumThreads(numUsableCores)
    conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
    conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)

    new TransportConf(module, new ConfigProvider {
      override def get(name: String): String = conf.get(name)
      override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
      override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
        conf.getAll.toMap.asJava.entrySet()
      }
    })
  }

  private def defaultNumThreads(numUsableCores: Int): Int = {
    val availableCores =
      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
    math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
  }
}

可见,SparkTransportConf.fromSparkConf()方法负责从SparkConf持有的参数创建TransportConf。TransportConf接受的构造参数有二:

这样,我们就可以通过SparkConf持有、并通过ConfigProvider获取Spark RPC的所有配置参数。换句话说,TransportConf就是SparkConf的一个子集,SparkConf仍然是配置的唯一入口,方便统一管理。

上面代码中需要初始化的参数如下:

TransportConf类的实现就非常简单了,主要由很多get方法组成。以下是部分代码,不再赘述。

代码#B1.3 - o.a.s.network.netty.TransportConf类的部分代码

  public TransportConf(String module, ConfigProvider conf) {
    this.module = module;
    this.conf = conf;
    SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
    SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
    SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
    // ...略...
  }

  public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(Locale.ROOT); }

  public boolean preferDirectBufs() { return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true); }

  public int connectionTimeoutMs() {
    long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
      conf.get("spark.network.timeout", "120s"));
    long defaultTimeoutMs = JavaUtils.timeStringAsSec(
      conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
    return (int) defaultTimeoutMs;
  }

  public int numConnectionsPerPeer() { return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1); }

  public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }

  public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); }

  public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }

  public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }

传输上下文TransportContext

既然SparkContext是Spark Core功能的主要入口,那么TransportContext自然就是Spark RPC环境的入口了。它比前面讲过的NettyRpcEnv更加底层,如果没有它,RPC环境也就无从谈起了。

成员属性与构造方法

代码#B1.4 - o.a.s.network.TransportContext类的成员属性与构造方法

  private final TransportConf conf;
  private final RpcHandler rpcHandler;
  private final boolean closeIdleConnections;

  private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
  private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;

  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
    this(conf, rpcHandler, false);
  }

  public TransportContext(
      TransportConf conf,
      RpcHandler rpcHandler,
      boolean closeIdleConnections) {
    this.conf = conf;
    this.rpcHandler = rpcHandler;
    this.closeIdleConnections = closeIdleConnections;
  }

以下3个成员属性同时也是TransportContext构造方法的参数:

另外还有两个常量ENCODER和DECODER。前者是消息编码器MessageEncoder的实例,由Netty提供的MessageToMessageEncoder派生而来,RPC服务端使用它来编码向客户端发送的消息;后者是消息解码器MessageDecoder的实例,由Netty提供的MessageToMessageDecoder派生而来,RPC客户端使用它来解码从服务端收到的消息。

下面具体看看TransportContext提供的方法,通过这些方法,我们能见识到更多其他由TransportContext创建的RPC组件。

创建传输客户端工厂TransportClientFactory

代码#B1.5 - o.a.s.network.TransportContext.createClientFactory()方法

  public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
    return new TransportClientFactory(this, bootstraps);
  }

  public TransportClientFactory createClientFactory() {
    return createClientFactory(new ArrayList<>());
  }

createClientFactory()方法负责创建传输客户端工厂TransportClientFactory,由TransportClientFactory进而可以创建更多的传输客户端TransportClient。其参数中的TransportClientBootstrap表示传输客户端的初始化逻辑(通常是一些一次性的工作,比如让它们携带SASL认证的token等)。

createClientFactory()方法是直接调用了TransportClientFactory的构造方法,关于它的逻辑,下篇文章详细讲。

创建传输服务端TransportServer

代码#B1.6 - o.a.s.network.TransportContext.createServer()方法

  public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, null, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(
      String host, int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, host, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
    return createServer(0, bootstraps);
  }

  public TransportServer createServer() {
    return createServer(0, new ArrayList<>());
  }

createServer()方法有4个重载,可以指定RPC服务端要绑定到的主机地址和端口号,以及初始化逻辑TransportServerBootstrap。TransportServer的构造方法和相关细节会与TransportClientFactory一起讲。

初始化Netty ChannelPipeline与ChannelHandler

先来看initializePipeline()方法。

代码#B1.7 - o.a.s.network.TransportContext.initializePipeline()方法

  public TransportChannelHandler initializePipeline(SocketChannel channel) {
    return initializePipeline(channel, rpcHandler);
  }

  public TransportChannelHandler initializePipeline(
      SocketChannel channel,
      RpcHandler channelRpcHandler) {
    try {
      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
      channel.pipeline()
        .addLast("encoder", ENCODER)
        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
        .addLast("decoder", DECODER)
        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
        .addLast("handler", channelHandler);
      return channelHandler;
    } catch (RuntimeException e) {
      logger.error("Error while initializing Netty pipeline", e);
      throw e;
    }
  }

这段代码看起来可能有些sophisticated,我们先回忆一下Netty的基础知识。

在Netty架构中,Channel(通道)是消息通信的载体,ChannelHandler(通道处理器)则负责Channel中消息通信具体逻辑的实现。而ChannelPipeline(通道管线)将多个ChannelHandler按顺序组织起来,ChannelEvent(消息体)就按照ChannelPipeline规定的顺序流转。下面用一幅简图来表示它们之间的关系。

图#B1.1 - Netty Channel、ChannelPipeline、ChannelHandler之间的关系

在ChannelPipeline内部用双链表来维护ChannelHandler以及它对应的上下文实例ChannelHandlerContext,另外还有特殊的头节点HeadContext和尾节点TailContext。Netty的这种设计可以让用户专注于实现ChannelHandler的逻辑细节,这大概也是Spark开发者们所看重的优点之一吧。

从上文代码中的initializePipeline()方法可以看出,通过链式调用ChannelPipeline.addLast()方法,按顺序添加了以下ChannelHandler:

它们的类图如下所示。

图#B1.2 - Spark RPC ChannelPipeline内的组件类图

其中,实现了ChannelInboundHandler接口的处理器用于处理请求消息,而实现了ChannelOutboundHandler接口的处理器用于处理响应消息,并且它们的顺序是相反的。因此,处理请求的流程是:TransportFrameDecoder→MessageDecoder→IdleStateHandler→TransportChannelHandler,处理响应的流程是:IdleStateHandler→MessageEncoder。

那么TransportChannelHandler是哪里来的呢?它是由createChannelHandler()方法创建的。

代码#B1.8 - o.a.s.network.TransportContext.createChannelHandler()方法

  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler, conf.maxChunksBeingTransferred());
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), closeIdleConnections);
  }

由代码可见,一个TransportChannelHandler实际上由三个组件组成:

注意,虽然TransportResponseHandler和TransportRequestHandler的名称里都有“Handler”,但它们不是Netty层面上的东西,仅仅是Spark内置的MessageHandler抽象类的实现而已,它规定了处理请求和响应的一些基本规范,后文会讲解到。

图#B1.3 - MessageHandler类图

总结

本文讲解了RPC环境中的传输配置TransportConf与传输上下文TransportContext的细节,探究了由TransportContext初始化的传输客户端工厂TransportClientFactory、传输服务端TransportServer,最后结合Netty的部分知识讲解了ChannelPipeline与ChannelHandler的初始化逻辑。

民那晚安。

上一篇 下一篇

猜你喜欢

热点阅读