sofabolt

SOFABolt 源码分析18 - Protocol 私有协议的

2018-10-19  本文已影响96人  原水寒
image.png
  • SOFABolt 提供了两种协议 RpcProtocol 和 RpcProtocolV2,两种协议都有相应的5个重要属性;Bolt 针对这两种协议分别提供了不同的编解码器。
  • ProtocolManager 是一个 Protocol 协议容器
  • RpcServer 服务端启动或者 RpcClient 客户端启动的时候,都会分别创建各自的 远程调用执行类(RpcServerRemoting 或者 RpcClientRemoting),此时会执行其父类 RpcRemoting 的静态块,在静态块中实现类 Protocol 协议的实例化 + 添加到 ProtocolManager 容器中

一、使用姿势

--------------------------String addr------------------------------
String addr = "127.0.0.1:8888?_PROTOCOL=2&_VERSION=2";
String res = (String) client.invokeSync(addr, req, 3000);

--------------------------Url url------------------------------
Url url = new Url(ip, port);
url.setProtocol(RpcProtocolV2.PROTOCOL_VERSION_2);
url.setVersion(RpcProtocolV2.PROTOCOL_VERSION_2);
url.setConnNum(1); // must set, default 0
String res = (String) client.invokeSync(url, req, 3000);

上述两种调用方式是最常用的两种。(还有一种直接通过 Connection 对象进行调用),我们以第一种为例(实际上第一种的底层原理最终会调用到第二种,第二种的底层是调用第三种,即 addr -> url -> connection),推荐尽量使用第一种,因为第一种方式会提供比较多的默认值,例如connNum,在直接使用url方式时需要手动设置,否则connNum为0,则不会创建连接。

客户端发起调用

  1. 在 addr 上添加参数 _PROTOCOL=2&_VERSION=2,之后在调用过程中,会将这两个参数解析出来并设置到数据总线 com.alipay.remoting.Url 的 protocol 属性(protocolCode) + version 属性(protocolVersion)
  2. 在创建连接的时候将 Url 中的 protocolCode 和 protocolVersion 设置到 netty channel的 附加属性中
  3. 根据 netty channel 中的 protocolCode 和 protocolVersion 进行编码

服务端处理请求

  1. 服务端接收到请求后,首先从发送来的 ByteBuf 数据中解码获取 protocolCode 和 protocolVersion,
  2. 之后将 protocolCode 和 protocolVersion 设置到客户端 channel 所对应的服务端的 netty channel 的附加属性中,
  3. 最后根据 netty channel 中的 protocolCode 和 protocolVersion 进行相应的解码操作

二、源码分析

源码分析分为两部分:

第一部分:初始化协议相关组件
第二部分:调用过程中协议的使用

2.1 初始化协议相关组件

2.1.1 两种协议的定义

RpcProtocol 协议定义

请求命令(协议头长度:22 byte)
 0     1     2           4           6           8          10           12          14         16
 +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
 |proto| type| cmdcode   |ver2 |   requestId           |codec|        timeout        |  classLen |
 +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
 |headerLen  | contentLen            |                             ... ...                       |
 +-----------+-----------+-----------+                                                           +
 |               className + header  + content  bytes                                            |
 +                                                                                               +
 |                               ... ...                                                         |
 +-----------------------------------------------------------------------------------------------+
响应命令(协议头长度:20 byte)
 0     1     2     3     4           6           8          10           12          14         16
 +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
 |proto| type| cmdcode   |ver2 |   requestId           |codec|respstatus |  classLen |headerLen  |
 +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
 | contentLen            |                  ... ...                                              |
 +-----------------------+                                                                       +
 |                         className + header  + content  bytes                                  |
 +                                                                                               +
 |                               ... ...                                                         |
 +-----------------------------------------------------------------------------------------------+

RpcProtocolV2 协议定义

请求命令(协议头长度:24 byte)
 0     1     2           4           6           8          10     11     12          14         16
 +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
 |proto| ver1|type | cmdcode   |ver2 |   requestId           |codec|switch|   timeout             |
 +-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
 |classLen   |headerLen  |contentLen             |           ...                                  |
 +-----------+-----------+-----------+-----------+                                                +
 |               className + header  + content  bytes                                             |
 +                                                                                                +
 |                               ... ...                                  | CRC32(optional)       |
 +------------------------------------------------------------------------------------------------+
响应命令(协议头长度:22 byte)
 0     1     2     3     4           6           8          10     11    12          14          16
 +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
 |proto| ver1| type| cmdcode   |ver2 |   requestId           |codec|switch|respstatus |  classLen |
 +-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
 |headerLen  | contentLen            |                      ...                                   |
 +-----------------------------------+                                                            +
 |               className + header  + content  bytes                                             |
 +                                                                                                +
 |                               ... ...                                  | CRC32(optional)       |
 +------------------------------------------------------------------------------------------------+

SOFABolt 针对 RpcProtocol 和 RpcProtocolV2 这两种协议,提供了两组不同的编解码器。

2.1.2 初始化协议组件

----------------------------- RpcRemoting 静态块 --------------------------
static {
    RpcProtocolManager.initProtocols();
}

------------------------- RpcProtocolManager.initProtocols --------------------------
public static void initProtocols() {
    ProtocolManager.registerProtocol(new RpcProtocol(), RpcProtocol.PROTOCOL_CODE);
    ProtocolManager.registerProtocol(new RpcProtocolV2(), RpcProtocolV2.PROTOCOL_CODE);
}

------------------------- ProtocolManager.registerProtocol --------------------------
private static final ConcurrentMap<ProtocolCode, Protocol> protocols = new ConcurrentHashMap<ProtocolCode, Protocol>();

public static Protocol getProtocol(ProtocolCode protocolCode) {
    return protocols.get(protocolCode);
}

public static void registerProtocol(Protocol protocol, byte... protocolCodeBytes) {
    registerProtocol(protocol, ProtocolCode.fromBytes(protocolCodeBytes));
}

public static void registerProtocol(Protocol protocol, ProtocolCode protocolCode) {
    if (null == protocolCode || null == protocol) {
        throw new RuntimeException("Protocol: " + protocol + " and protocol code:"
                                   + protocolCode + " should not be null!");
    }
    Protocol exists = ProtocolManager.protocols.putIfAbsent(protocolCode, protocol);
    if (exists != null) {
        throw new RuntimeException("Protocol for code: " + protocolCode + " already exists!");
    }
}

------------------------- RpcProtocol.构造器 --------------------------
public static final byte PROTOCOL_CODE       = (byte) 1;
private static final int REQUEST_HEADER_LEN  = 22;
private static final int RESPONSE_HEADER_LEN = 20;
private CommandEncoder   encoder;
private CommandDecoder   decoder;
private HeartbeatTrigger heartbeatTrigger;
private CommandHandler   commandHandler;
private CommandFactory   commandFactory;

public RpcProtocol() {
this.encoder = new RpcCommandEncoder();
this.decoder = new RpcCommandDecoder();
this.commandFactory = new RpcCommandFactory();
this.heartbeatTrigger = new RpcHeartbeatTrigger(this.commandFactory);
this.commandHandler = new RpcCommandHandler(this.commandFactory);
}

------------------------- RpcProtocolV2.构造器 --------------------------
public static final byte PROTOCOL_CODE       = (byte) 2;
/** version 1, is the same with RpcProtocol */
public static final byte PROTOCOL_VERSION_1  = (byte) 1;
/** version 2, is the protocol version for RpcProtocolV2 */
public static final byte PROTOCOL_VERSION_2  = (byte) 2;

/**
 * in contrast to protocol v1,
 * one more byte is used as protocol version, and another one is userd as protocol switch
 */
private static final int REQUEST_HEADER_LEN  = 22 + 2;
private static final int RESPONSE_HEADER_LEN = 20 + 2;
private CommandEncoder   encoder;
private CommandDecoder   decoder;
private HeartbeatTrigger heartbeatTrigger;
private CommandHandler   commandHandler;
private CommandFactory   commandFactory;

public RpcProtocolV2() {
    this.encoder = new RpcCommandEncoderV2();
    this.decoder = new RpcCommandDecoderV2();
    this.commandFactory = new RpcCommandFactory();
    this.heartbeatTrigger = new RpcHeartbeatTrigger(this.commandFactory);
    this.commandHandler = new RpcCommandHandler(this.commandFactory);
}

代码较为简单,扩展性也很强,我们可以基于 Protocol 接口创建协议实现,之后将该实现注册到 ProtocolManager 中,最后在使用的时候根据传递的 protocolCode 就可以选择相应的协议实现了,在编解码的时候也可以根据 protocolVersion 做一些协议内的字段的细小调整。 - 策略模式

2.2 调用过程中协议的使用

String addr = "127.0.0.1:8888?_PROTOCOL=2&_VERSION=2";
String res = (String) client.invokeSync(addr, req, 3000);

以 String addr 形式为例。

------------------------- RpcRemoting.invokeSync --------------------------
public Object invokeSync(String addr, Object request, InvokeContext invokeContext, int timeoutMillis) {
    // 将 addr 解析为 url 数据总线
    Url url = this.addressParser.parse(addr);
    // 转化为 url 形式的调用(url 形式的调用最终会转为 Connection形式的调用)
    return this.invokeSync(url, request, invokeContext, timeoutMillis);
}
------------------------- RpcClientRemoting.invokeSync --------------------------
public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis {
    final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
    this.connectionManager.check(conn);
    return this.invokeSync(conn, request, invokeContext, timeoutMillis);
}
------------------------- RpcAddressParser.parse 轮廓 --------------------------
public static ConcurrentHashMap<String, SoftReference<Url>> parsedUrls  = new ConcurrentHashMap<String, SoftReference<Url>>();

public Url parse(String url) {
    // 1、从缓存获取 url
    Url parsedUrl = this.tryGet(url);
    if (null != parsedUrl) {
        return parsedUrl;
    }
    String ip = null;
    String port = null;
    Properties properties = null;
    // 分解获取 ip
    ip = "127.0.0.1"
    // 分解获取 port
    port = 8888
    // 分解获取设置属性
    properties = new Properties();
    properties.put("_PROTOCOL", "2");
    properties.put("_VERSION", "2");
    // 创建 url
    parsedUrl = new Url(url, ip, Integer.parseInt(port), properties);
    // 将 url#properties 中的键值对获取并设置到 url 的独立属性中(如果没有键值对,这里会设置默认值)
    this.initUrlArgs(parsedUrl);
    // 加入缓存
    Url.parsedUrls.put(url, new SoftReference<Url>(parsedUrl));
    return parsedUrl;
}

public void initUrlArgs(Url url) {
    // 从 url#properties 属性中获取 _PROTOCOL 
    String protocolStr = url.getProperty(RpcConfigs.URL_PROTOCOL);
    byte protocol = RpcProtocol.PROTOCOL_CODE;
    if (StringUtils.isNotBlank(protocolStr)) {
        protocol = Byte.parseByte(protocolStr);
    }
    url.setProtocol(protocol);

    // 从 url#properties 属性中获取 _VERSION 
    String versionStr = url.getProperty(RpcConfigs.URL_VERSION);
    byte version = RpcProtocolV2.PROTOCOL_VERSION_1;
    if (StringUtils.isNotBlank(versionStr)) {
        version = Byte.parseByte(versionStr);
    }
    url.setVersion(version);

    // 除了 protocolCode 和 protocolVersion 之外,这里还会做 _CONNECTTIMEOUT 连接超时 + 每个addr的 _CONNECTIONNUM 连接数量 + _CONNECTIONWARMUP 是否需要做连接预热 三个配置
}

------------------------- Url 数据总线 --------------------------
/** origin url */
private String     originUrl;
/** ip, can be number format or hostname format*/
private String     ip;
/** port, should be integer between (0, 65535]*/
private int        port;
/** unique key of this url */
private String     uniqueKey; // 如果在构造器没有传递,默认为 ip:port
/** URL args: timeout value when do connect */
private int        connectTimeout; // 默认为 1000 ms
/** URL args: protocol */
private byte       protocol; // 默认为 RpcProtocol.PROTOCOL_CODE = 1
/** URL args: version */
private byte       version = RpcProtocolV2.PROTOCOL_VERSION_1;
/** URL agrs: connection number */ 
private int        connNum; // 默认为 1
/** URL agrs: whether need warm up connection */
private boolean    connWarmup; // 默认为 false
/** URL agrs: all parsed args of each originUrl */
private Properties properties;

上述介绍了将 String addr 转化为 Url 的代码。接下来,就去创建连接,然后发起调用。

------------------------- AbstractConnectionFactory.createConnection --------------------------
public Connection createConnection(Url url) throws Exception {
    // 创建 netty channel
    Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
    // 包装 channel + 将 url 的属性赋值给 Connection 的属性 + 为 channel 添加 附属属性
    Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()), url.getVersion(), url);
    //触发 ConnectionEventType.CONNECT 事件
    channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
    return conn;
}
------------------------- Connection 核心方法 --------------------------
public Connection(Channel channel, ProtocolCode protocolCode, byte version, Url url) {
    this(channel, url);
    this.protocolCode = protocolCode;
    this.version = version;
    this.init();
}

public Connection(Channel channel, Url url) {
    this(channel);
    this.url = url;
    this.poolKeys.add(url.getUniqueKey());
}

public Connection(Channel channel) {
    this.channel = channel;
    // 将当前的 Connection 对象作为附属属性传递给 channel
    this.channel.attr(CONNECTION).set(this);
}

private void init() {
    this.channel.attr(HEARTBEAT_COUNT).set(new Integer(0));
    // 设置 PROTOCOL 附属属性
    this.channel.attr(PROTOCOL).set(this.protocolCode);
    // 设置 VERSION 附属属性
    this.channel.attr(VERSION).set(this.version);
    this.channel.attr(HEARTBEAT_SWITCH).set(true);
}

这样就将 url 中的属性值设置给了 Connection 及其 channel 的附属属性中。之后发送请求时,编码器再从 channel 中解析出 protocolCode 和 version,进行相应的编码操作。

------------------------- ProtocolCodeBasedEncoder.encode --------------------------
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) {
    // 从 channel 中获取 protocolCode
    Attribute<ProtocolCode> att = ctx.channel().attr(Connection.PROTOCOL);
    ProtocolCode protocolCode;
    if (att == null || att.get() == null) {
        protocolCode = this.defaultProtocolCode;
    } else {
        protocolCode = att.get();
    }
    // 从 ProtocolManager 协议容器中根据 protocolCode 获取 Protocol 实例
    Protocol protocol = ProtocolManager.getProtocol(protocolCode);
    // 编码过程根据 protocolVersion 的不同,会做出相应字段的调整
    protocol.getEncoder().encode(ctx, msg, out);
}

当服务端接收到消息后,从 ByteBuf 中进行解码。

------------------------- ProtocolCodeBasedDecoder.decode --------------------------
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)  {
    in.markReaderIndex();
    // 1. 从 ByteBuf 中解析出 protocolCode
    ProtocolCode protocolCode = decodeProtocolCode(in);
    if (null != protocolCode) {
        // 2. 从 ByteBuf 中解析出 protocolVersion
        byte protocolVersion = decodeProtocolVersion(in);
        if (ctx.channel().attr(Connection.PROTOCOL).get() == null) { // 第一次请求为 null,后续该 channel 就有值了
            // 3. 将解析出来的 protocolCode 和 protocolVersion 设置到客户端 channel 所对应的服务端 channel 上
            ctx.channel().attr(Connection.PROTOCOL).set(protocolCode);
            if (DEFAULT_ILLEGAL_PROTOCOL_VERSION_LENGTH != protocolVersion) {
                ctx.channel().attr(Connection.VERSION).set(protocolVersion);
            }
        }
        // 4. 根据 protocolCode 获取 Protocol,之后进行解码(解码过程根据 protocolVersion 的不同,会做出相应字段的调整)
        Protocol protocol = ProtocolManager.getProtocol(protocolCode);
        if (null != protocol) {
            in.resetReaderIndex();
            protocol.getDecoder().decode(ctx, in, out);
        } 
    }
}

protected ProtocolCode decodeProtocolCode(ByteBuf in) {
    if (in.readableBytes() >= protocolCodeLength) {
        byte[] protocolCodeBytes = new byte[protocolCodeLength];
        in.readBytes(protocolCodeBytes);
        return ProtocolCode.fromBytes(protocolCodeBytes);
    }
    return null;
}

protected byte decodeProtocolVersion(ByteBuf in) {
    if (in.readableBytes() >= DEFAULT_PROTOCOL_VERSION_LENGTH) {
        return in.readByte();
    }
    return DEFAULT_ILLEGAL_PROTOCOL_VERSION_LENGTH;
}

关于编解码细节,见《编解码分析》;
关于连接细节,见 SOFABolt 源码分析12 - Connection 连接管理设计

上一篇下一篇

猜你喜欢

热点阅读