sofabolt

SOFABolt 源码分析15 - 双工通信机制的设计

2018-10-18  本文已影响115人  原水寒
image.png

SOFABolt 提供了双工通信能力,使得不仅客户端可以调用服务端,服务端也可以主动调用客户端(当然,客户端也就需要可以注册 UserProcessor 的功能)。

SOFABolt 四种调用模式:oneway / sync / future / callback
SOFABolt 三种调用链路:addr / url / connection,
注意:在整个调用过程中,调用链路会发生如下转化:addr -> url -> connection

如上图所示,整个调用辅助类包含:

  • BaseRemoting:提供了四种基本调用模式的方法(基于 Connection 的,值得注意的是,Connection 也是三种调用链路最底层的),这四种调用模式也提供了基本的调用模板;
  • RpcRemoting
  • 实现了 RpcProtocol 的初始化
  • 实现了基于 addr 链路的四种基本调用模式模板(内部实际调用基于 url 链路模式)
  • 提供了将请求对象 Object 封装为 RemotingCommand 的方法
  • RpcClientRemoting
  • 实现了基于 url 链路的四种基本调用模式模板(内部实际调用基于 connection 链路模式,根据 url 调用建连接口创建 connection)
  • 提供了建连操作
  • RpcServerRemoting

实现了基于 url 链路的四种基本调用模式模板(内部实际调用基于 connection 链路模式,但是不会根据 url 创建 connection,只会从连接管理器根据 url 获取连接 - 所以要想使用基于 url 链路的功能,必须开启服务端连接管理功能,而基于 addr 链路的方式底层又是转化为 url 链路方式,所以基于 addr 链路的功能,也必须开启服务端连接管理功能

注意:

  • 客户端调用服务端会主动建连,服务端调用客户端不会主动建连
  • 服务端想使用基于 url 链路或者基于 addr 链路的调用功能,必须开启服务端连接管理功能(实际上还需要保存 addr 链路地址,通常通过 UserProcessor.handleRequest 中的 BizContext 来获取 remoteAddr 并存储在 UserProcessor 中)
  • 服务端如果没有开启服务端连接管理功能,只能通过 connection 链路进行调用,此时要保存好连接建立好的时候创建的 connection 对象(通常使用 ConnectionEventType.CONNECT 的连接事件处理器做这件事)

一、使用姿势

1.1、基于 addr 链路模式

服务端

public class MyServer {
    public static void main(String[] args) throws RemotingException, InterruptedException {
        RpcServer server= new RpcServer(8888);
        MyServerUserProcessor serverUserProcessor = new MyServerUserProcessor();
        server.registerUserProcessor(serverUserProcessor);
        // 打开服务端连接管理功能
        server.switches().turnOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH);

        if (server.start()) {
            System.out.println("server start success!");
            // 模拟去其他事情
            Thread.sleep(10000);
            MyRequest request = new MyRequest();
            request.setReq("hi, bolt-client");
            // 向 serverUserProcessor 存储的 RemoteAddr 发起请求
            MyResponse resp = (MyResponse)server.invokeSync(serverUserProcessor.getRemoteAddr(), request, 10000);
            System.out.println(resp.getResp());
        } else {
            System.out.println("server start fail!");
        }
    }
}

=========================== 服务端业务逻辑处理器 ===========================
public class MyServerUserProcessor extends SyncUserProcessor<MyRequest> {
    // 存储 client 端地址,用于发起远程调用
    private String remoteAddr;

    @Override
    public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
        remoteAddr = bizCtx.getRemoteAddress(); // 此处也可以存储 Connection:bizCtx.getConnection();
        MyResponse response = new MyResponse();
        if (request != null) {
            System.out.println(request);
            response.setResp("from server -> " + request.getReq());
        }

        return response;
    }

    @Override
    public String interest() {
        return MyRequest.class.getName();
    }

    public String getRemoteAddr() {
        return remoteAddr;
    }
}

客户端

public class MyClient {
    private static RpcClient client;
    private static CountDownLatch latch = new CountDownLatch(1);

    public static void start() {
        client = new RpcClient();
        // 注册业务逻辑处理器
        client.registerUserProcessor(new MyClientUserProcessor());
        client.init();
    }

    public static void main(String[] args) throws RemotingException, InterruptedException {
        MyClient.start();
        MyRequest request = new MyRequest();
        request.setReq("hello, bolt-server");
        MyResponse response = (MyResponse) client.invokeSync("127.0.0.1:8888", request, 300 * 1000);
        System.out.println(response);
        latch.await();
    }
}

=========================== 客户端业务逻辑处理器 ===========================
public class MyClientUserProcessor extends SyncUserProcessor<MyRequest> {
    @Override
    public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
        MyResponse response = new MyResponse();
        if (request != null) {
            System.out.println(request);
            response.setResp("from client -> " + request.getReq());
        }
        return response;
    }

    @Override
    public String interest() {
        return MyRequest.class.getName();
    }
}

1.2、基于 connection 链路模式

服务端

public class MyServer {
    public static void main(String[] args) throws RemotingException, InterruptedException {
        RpcServer server= new RpcServer(8888);
        MyServerUserProcessor serverUserProcessor = new MyServerUserProcessor();
        server.registerUserProcessor(serverUserProcessor);
        // 创建并注册 ConnectionEventType.CONNECT 连接事件处理器
        MyCONNECTEventProcessor connectEventProcessor = new MyCONNECTEventProcessor();
        server.addConnectionEventProcessor(ConnectionEventType.CONNECT, connectEventProcessor);
        if (server.start()) {
            System.out.println("server start success!");
            // 模拟去其他事情
            Thread.sleep(10000);
            MyRequest request = new MyRequest();
            request.setReq("hi, bolt-client");
            // 向 connectEventProcessor 存储的 connection 发起请求
            MyResponse resp = (MyResponse)server.invokeSync(connectEventProcessor.getConnection(), request, 10000);
            System.out.println(resp.getResp());
        } else {
            System.out.println("server start fail!");
        }
    }
}

=========================== 连接事件处理器 ===========================
public class MyCONNECTEventProcessor implements ConnectionEventProcessor {
    // 存储连接,用于服务端向客户端发起远程通信
    private Connection connection;

    @Override
    public void onEvent(String remoteAddr, Connection conn) {
        this.connection = conn;
        System.out.println("hello, " + remoteAddr);
    }

    public Connection getConnection() {
        return connection;
    }
}

客户端

同 1.1

二、源码解析

建连

======================================== RpcServer ========================================
    protected void doInit() {
        // 开启服务端连接管理功能
        if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
            this.connectionEventHandler = new RpcConnectionEventHandler(switches());
            // 与客户端一样,创建连接管理器,并设置到 connectionEventHandler 中
            this.connectionManager = new DefaultConnectionManager(new RandomSelectStrategy());
            this.connectionEventHandler.setConnectionManager(this.connectionManager);
            this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
        } else {
            this.connectionEventHandler = new ConnectionEventHandler(switches());
            this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
        }
        initRpcRemoting();
        ...
        this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) {
                ...
                createConnection(channel);
            }

            private void createConnection(SocketChannel channel) {
                Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
                // 如果开启了连接管理功能,则新建 Connection 并加入 连接管理器;
                // 如果没有开启,则直接新建 Connection
                if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                    connectionManager.add(new Connection(channel, url), url.getUniqueKey());
                } else {
                    new Connection(channel, url);
                }
                // 发布建连事件,此时进行连接的保存
                channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
            }
        });
    }

======================================== DefaultConnectionManager ========================================
    public void add(Connection connection, String poolKey) {
        ConnectionPool pool = null;
        // 如果有 ConnectionPool 直接获取,没有就创建一个新的
        // 与客户端不同的是,该 add 方法只创建一个空的 ConnectionPool,不会初始化连接;而客户端会初始化连接
        // 连接是否初始化,取决于 ConnectionPoolCall 构造器参数
        pool = this.getConnectionPoolAndCreateIfAbsent(poolKey, new ConnectionPoolCall());
        // 连接加入连接池
        pool.add(connection);
    }

    private class ConnectionPoolCall implements Callable<ConnectionPool> {
        // 是否初始化连接
        private boolean whetherInitConnection;
        private Url     url;

        // 默认构造器:只创建 pool,不建连(服务端用的这个)
        public ConnectionPoolCall() {
            this.whetherInitConnection = false;
        }

        // 创建 pool + 建连(客户端用的这个)
        public ConnectionPoolCall(Url url) {
            this.whetherInitConnection = true;
            this.url = url;
        }

        @Override
        public ConnectionPool call() throws Exception {
            // 创建 pool
            final ConnectionPool pool = new ConnectionPool(connectionSelectStrategy);
            if (whetherInitConnection) {
                // 建连
                doCreate(this.url, pool, this.getClass().getSimpleName(), 1);
            }
            return pool;
        }
    }

调用

======================================== RpcRemoting ========================================
    public Object invokeSync(String addr, Object request, InvokeContext invokeContext, int timeoutMillis) {
        // addr => url
        Url url = this.addressParser.parse(addr);
        // 以 url 链路方式进行调用
        return this.invokeSync(url, request, invokeContext, timeoutMillis);
    }

======================================== RpcServerRemoting ========================================
    public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) {
        // 直接从 connectionManager 获取连接(仅仅是获取,不建连)
        Connection conn = this.connectionManager.get(url.getUniqueKey());
        // 检查连接
        this.connectionManager.check(conn);
        // 以 connection 链路方式进行调用
        return this.invokeSync(conn, request, invokeContext, timeoutMillis);
    }

    public Connection DefaultConnectionManager#get(String poolKey) {
        // 获取连接池,如果没有返回null,否则从连接池获取一个 connection。整个过程只是单纯的获取,不做建连操作
        ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
        return null == pool ? null : pool.get();
    }
======================================== RpcClientRemoting ========================================
    public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis) {
        // 获取连接,如果没有就建连
        final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
        // 检查连接
        this.connectionManager.check(conn);
        // 以 connection 链路方式进行调用
        return this.invokeSync(conn, request, invokeContext, timeoutMillis);
    }

======================================== RpcRemoting ========================================
    public Object invokeSync(Connection conn, Object request, InvokeContext invokeContext, int timeoutMillis) {
        // 构造请求统一体
        RemotingCommand requestCommand = toRemotingCommand(request, conn, invokeContext, timeoutMillis);
        // 预处理 invokeContext
        preProcessInvokeContext(invokeContext, requestCommand, conn);
        // 发起调用,返回响应统一体
        ResponseCommand responseCommand = (ResponseCommand) super.invokeSync(conn, requestCommand, timeoutMillis);
        // 设置 invokeContext 到响应统一体
        responseCommand.setInvokeContext(invokeContext);
        // 从响应统一体解析出真正的响应消息
        Object responseObject = RpcResponseResolver.resolveResponseObject(responseCommand, RemotingUtil.parseRemoteAddress(conn.getChannel()));
        return responseObject;
    }

======================================== BaseRemoting ========================================
    protected RemotingCommand invokeSync(Connection conn, RemotingCommand request, int timeoutMillis) {
        // 创建 InvokeFuture
        final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
        // 将 InvokeFuture 添加到 Connection 的 InvokeFutureMap 映射中
        conn.addInvokeFuture(future);
        // 发起 netty 请求
        conn.getChannel().writeAndFlush(request);
        // 阻塞等待响应结果
        RemotingCommand response = future.waitResponse(timeoutMillis);
        // 被唤醒后返回响应结果
        return response;
    }
上一篇下一篇

猜你喜欢

热点阅读