Dubbo——Transporter 层核心实现(下)

2021-04-20  本文已影响0人  小波同学

前言

本文分析 Transporter 层中剩余的核心接口实现,主要涉及 Client 接口、Channel 接口、ChannelHandler 接口,以及相关的关键组件。

Client 继承路线分析

除了 AbstractServer 这一条继承线之外,还有 AbstractClient 这条继承线,它是对客户端的抽象。AbstractClient 中的核心字段有如下几个:

在 AbstractClient 的构造方法中,会解析 URL 初始化 needReconnect 字段和 executor字段,如下示例代码:

public abstract class AbstractClient extends AbstractEndpoint implements Client {

    public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        // 调用父类的构造方法
        super(url, handler);
        
        // 解析URL,初始化needReconnect值
        needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);

        // 解析URL,初始化executor
        initExecutor(url);

        try {
            // 初始化底层的NIO库的相关组件
            doOpen();
        } catch (Throwable t) {
            close();
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }

        try {
            // connect.
            // 创建底层连接
            connect();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
            }
        } catch (RemotingException t) {
            if (url.getParameter(Constants.CHECK_KEY, true)) {
                close();
                throw t;
            } else {
                logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
            }
        } catch (Throwable t) {
            close();
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }
    }
}

与 AbstractServer 类似,AbstractClient 定义了 doOpen()、doClose()、doConnect()和doDisConnect() 四个抽象方法给子类实现。

下面来看基于 Netty 4 实现的 NettyClient,它继承了 AbstractClient 抽象类,实现了上述四个 do*() 抽象方法,我们这里重点关注 doOpen() 方法和 doConnect() 方法。在 NettyClient 的 doOpen() 方法中会通过 Bootstrap 构建客户端,其中会完成连接超时时间、keepalive 等参数的设置,以及 ChannelHandler 的创建和注册,具体实现如下所示:

public class NettyClient extends AbstractClient {

    private static final EventLoopGroup NIO_EVENT_LOOP_GROUP = eventLoopGroup(Constants.DEFAULT_IO_THREADS, "NettyClientWorker");

    private static final String SOCKS_PROXY_HOST = "socksProxyHost";

    private static final String SOCKS_PROXY_PORT = "socksProxyPort";

    private static final String DEFAULT_SOCKS_PROXY_PORT = "1080";
    
    private Bootstrap bootstrap;

    @Override
    protected void doOpen() throws Throwable {
        // 创建NettyClientHandler
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        // 创建Bootstrap
        bootstrap = new Bootstrap();
        bootstrap.group(NIO_EVENT_LOOP_GROUP)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(socketChannelClass());

        // 设置连接超时时间,这里使用到AbstractEndpoint中的connectTimeout字段
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                // 心跳请求的时间间隔
                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());

                if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                    ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));
                }

                // 通过NettyCodecAdapter创建Netty中的编解码器
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                
                // 注册ChannelHandler
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", adapter.getDecoder())
                        .addLast("encoder", adapter.getEncoder())
                        .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                        .addLast("handler", nettyClientHandler);

                // 如果需要Socks5Proxy,需要添加Socks5ProxyHandler
                String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
                if(socksProxyHost != null) {
                    int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                    Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                    ch.pipeline().addFirst(socks5ProxyHandler);
                }
            }
        });
    }
}

NettyClientHandler 的实现方法与 NettyServerHandler 类似,同样是实现了 Netty 中的 ChannelDuplexHandler,其中会将所有方法委托给 NettyClient 关联的 ChannelHandler 对象进行处理。两者在 userEventTriggered() 方法的实现上有所不同,NettyServerHandler 在收到 IdleStateEvent 事件时会断开连接,而 NettyClientHandler 则会发送心跳消息,具体实现如下:

public class NettyClientHandler extends ChannelDuplexHandler {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // send heartbeat when read idle.
        if (evt instanceof IdleStateEvent) {
            try {
                NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
                if (logger.isDebugEnabled()) {
                    logger.debug("IdleStateEvent triggered, send heartbeat to channel " + channel);
                }
                Request req = new Request();
                req.setVersion(Version.getProtocolVersion());
                req.setTwoWay(true);
                req.setEvent(HEARTBEAT_EVENT);
                // 发送心跳请求
                channel.send(req);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

Channel 继承线分析

AbstractChannel 也继承了 AbstractPeer 这个抽象类,同时还继承了 Channel 接口。AbstractChannel 实现非常简单,只是在 send() 方法中检测了底层连接的状态,没有实现具体的发送消息的逻辑。

这里我们依然以基于 Netty 4 的实现—— NettyChannel 为例,分析它对 AbstractChannel 的实现。NettyChannel 中的核心字段有如下几个。

另外,在 NettyChannel 中还有一个静态的 Map 集合(CHANNEL_MAP 字段),用来缓存当前 JVM 中 Netty 框架 Channel 与 Dubbo Channel 之间的映射关系。从下图的调用关系中可以看到,NettyChannel 提供了读写 CHANNEL_MAP 集合的方法:

NettyChannel 中还有一个要介绍的是 send() 方法,它会通过底层关联的 Netty 框架 Channel,将数据发送到对端。其中,可以通过第二个参数指定是否等待发送操作结束,具体实现如下:

final class NettyChannel extends AbstractChannel {

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        // whether the channel is closed
        // 调用AbstractChannel的send()方法检测连接是否可用
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            // 依赖Netty框架的Channel发送数据
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                // wait timeout ms
                // 等待发送结束,有超时时间
                timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.cause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            // 出现异常会调用removeChannelIfDisconnected()方法,在底层连接断开时,
            // 会清理CHANNEL_MAP缓存
            removeChannelIfDisconnected(channel);
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }
        if (!success) {
            throw new RemotingException(this, "Failed to send message " + PayloadDropper.getRequestWithoutData(message) + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }
    
    static void removeChannelIfDisconnected(Channel ch) {
        if (ch != null && !ch.isActive()) {
            //清理CHANNEL_MAP缓存
            NettyChannel nettyChannel = CHANNEL_MAP.remove(ch);
            if (nettyChannel != null) {
                nettyChannel.markActive(false);
            }
        }
    }
}

ChannelHandler 继承线分析

前面介绍的 AbstractServer、AbstractClient 以及 Channel 实现,都是通过 AbstractPeer 实现了 ChannelHandler 接口,但只是做了一层简单的委托(也可以说成是装饰器),将全部方法委托给了其底层关联的 ChannelHandler 对象。

其中ChannelHandlerDispatcher负责将多个 ChannelHandler 对象聚合成一个 ChannelHandler 对象。

ChannelHandlerAdapter是 ChannelHandler 的一个空实现,TelnetHandlerAdapter 继承了它并实现了 TelnetHandler 接口。

ChannelHandlerDelegate接口是对另一个 ChannelHandler 对象的封装,它的两个实现类 AbstractChannelHandlerDelegate 和 WrappedChannelHandler 中也仅仅是封装了另一个 ChannelHandler 对象。

其中,AbstractChannelHandlerDelegate有三个实现类,都比较简单,我们来逐个讲解。

public class HeartbeatHandler extends AbstractChannelHandlerDelegate {

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 记录最近的读写事件时间戳
        setReadTimestamp(channel);
        
        // 收到心跳请求
        if (isHeartbeatRequest(message)) {
            Request req = (Request) message;
            if (req.isTwoWay()) {
                // 返回心跳响应,注意,携带请求的ID
                Response res = new Response(req.getId(), req.getVersion());
                res.setEvent(HEARTBEAT_EVENT);
                channel.send(res);
                if (logger.isInfoEnabled()) {
                    int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                    }
                }
            }
            return;
        }
        if (isHeartbeatResponse(message)) {
            // 收到心跳响应
            if (logger.isDebugEnabled()) {
                // 打印日志
                logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
            }
            return;
        }
        handler.received(channel, message);
    }
}

另外,我们可以看到,在 received() 和 send() 方法中,HeartbeatHandler 会将最近一次的读写时间作为附加属性记录到 Channel 中。

通过上述介绍,我们发现 AbstractChannelHandlerDelegate 下的三个实现,其实都是在原有 ChannelHandler 的基础上添加了一些增强功能,这是典型的装饰器模式的应用。

Dispatcher 与 ChannelHandler

ChannelHandlerDelegate 接口的另一条继承线——WrappedChannelHandler,其子类主要是决定了 Dubbo 以何种线程模型处理收到的事件和消息,就是所谓的“消息派发机制”,与前面介绍的 ThreadPool 有紧密的联系。

从上图中我们可以看到,每个 WrappedChannelHandler 实现类的对象都由一个相应前缀的 Dispatcher 实现类创建,下面是 Dispatcher 接口的定义:

@SPI(AllDispatcher.NAME) // 默认扩展名是all
public interface Dispatcher {
    // 通过URL中的参数可以指定扩展名,覆盖默认扩展名
    @Adaptive({"dispatcher", "dispather", "channel.handler"})
    ChannelHandler dispatch(ChannelHandler handler, URL url);
}

AllDispatcher 创建的是 AllChannelHandler 对象,它会将所有网络事件以及消息交给关联的线程池进行处理。AllChannelHandler覆盖了 WrappedChannelHandler 中除了 sent() 方法之外的其他网络事件处理方法,将调用其底层的 ChannelHandler 的逻辑放到关联的线程池中执行。

我们先来看 connect() 方法,其中会将CONNECTED 事件的处理封装成ChannelEventRunnable提交到线程池中执行,具体实现如下:

public class AllChannelHandler extends WrappedChannelHandler {

    @Override
    public void connected(Channel channel) throws RemotingException {
        // 获取公共线程池
        ExecutorService executor = getExecutorService();
        try {
            // 将CONNECTED事件的处理封装成ChannelEventRunnable提交到线程池中执行
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }
}

这里的 getExecutorService() 方法会按照当前端点(Server/Client)的 URL 从 ExecutorRepository 中获取相应的公共线程池。

disconnected()方法处理连接断开事件,caught() 方法处理异常事件,它们也是按照上述方式实现的,这里不再展开赘述。

received() 方法会在当前端点收到数据的时候被调用,具体执行流程是先由 IO 线程(也就是 Netty 中的 EventLoopGroup)从二进制流中解码出请求,然后调用 AllChannelHandler 的 received() 方法,其中会将请求提交给线程池执行,执行完后调用 sent()方法,向对端写回响应结果。received() 方法的具体实现如下:

public class AllChannelHandler extends WrappedChannelHandler {

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 获取线程池
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            // 将消息封装成ChannelEventRunnable任务,提交到线程池中执行
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            // 如果线程池满了,请求会被拒绝,这里会根据请求配置决定是否返回一个说明性的响应
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}

getPreferredExecutorService() 方法对响应做了特殊处理:如果请求在发送的时候指定了关联的线程池,在收到对应的响应消息的时候,会优先根据请求的 ID 查找请求关联的线程池处理响应。

public class WrappedChannelHandler implements ChannelHandlerDelegate {

    public ExecutorService getPreferredExecutorService(Object msg) {
        if (msg instanceof Response) {
            Response response = (Response) msg;
            // 获取请求关联的DefaultFuture
            DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
            // a typical scenario is the response returned after timeout, the timeout response may has completed the future
            if (responseFuture == null) {
                return getSharedExecutorService();
            } else {
                // 如果请求关联了线程池,则会获取相关的线程来处理响应
                ExecutorService executor = responseFuture.getExecutor();
                if (executor == null || executor.isShutdown()) {
                    executor = getSharedExecutorService();
                }
                return executor;
            }
        } else {
            // 如果是请求消息,则直接使用公共的线程池处理
            return getSharedExecutorService();
        }
    }
}

这里涉及了 Request 和 Response 的概念,是 Exchange 层的概念,在后面会展开介绍,这里你只需要知道它们是不同的消息类型即可。

注意,AllChannelHandler 并没有覆盖父类的 sent() 方法,也就是说,发送消息是直接在当前线程调用 sent() 方法完成的。

下面我们来看剩余的 WrappedChannelHandler 的实现。ExecutionChannelHandler(由 ExecutionDispatcher 创建)只会将请求消息派发到线程池进行处理,也就是只重写了 received() 方法。对于响应消息以及其他网络事件(例如,连接建立事件、连接断开事件、心跳消息等),ExecutionChannelHandler 会直接在 IO 线程中进行处理。

DirectChannelHandler 实现(由 DirectDispatcher 创建)会在 IO 线程中处理所有的消息和网络事件。

MessageOnlyChannelHandler 实现(由 MessageOnlyDispatcher 创建)会将所有收到的消息提交到线程池处理,其他网络事件则是由 IO 线程直接处理。

ConnectionOrderedChannelHandler 实现(由 ConnectionOrderedDispatcher 创建)会将收到的消息交给线程池进行处理,对于连接建立以及断开事件,会提交到一个独立的线程池并排队进行处理。在 ConnectionOrderedChannelHandler 的构造方法中,会初始化一个线程池,该线程池的队列长度是固定的:

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        // 注意,该线程池只有一个线程,队列的长度也是固定的,
        // 由URL中的connect.queue.capacity参数指定
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queuewarninglimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }
}

在 ConnectionOrderedChannelHandler 的 connected() 方法和 disconnected() 方法实现中,会将连接建立和断开事件交给上述 connectionExecutor 线程池排队处理。

在上面介绍 WrappedChannelHandler 各个实现的时候,我们会看到其中有针对 ThreadlessExecutor 这种线程池类型的特殊处理,例如,ExecutionChannelHandler.received() 方法中就有如下的分支逻辑:

public class ExecutionChannelHandler extends WrappedChannelHandler {

    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 获取线程池(请求绑定的线程池或是公共线程池)
        ExecutorService executor = getPreferredExecutorService(message);

        if (message instanceof Request) {
            try {
                // 请求消息直接提交给线程池处理
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                // this scenario from happening, but a better solution should be considered later.
                if (t instanceof RejectedExecutionException) {
                    sendFeedback(channel, (Request) message, t);
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
            }
        } else if (executor instanceof ThreadlessExecutor) {
            // 针对ThreadlessExecutor这种线程池类型的特殊处理
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } else {
            handler.received(channel, message);
        }
    }
}

ThreadlessExecutor 优化

ThreadlessExecutor 是一种特殊类型的线程池,与其他正常的线程池最主要的区别是:ThreadlessExecutor 内部不管理任何线程。

我们可以调用 ThreadlessExecutor 的execute() 方法,将任务提交给这个线程池,但是这些提交的任务不会被调度到任何线程执行,而是存储在阻塞队列中,只有当其他线程调用 ThreadlessExecutor.waitAndDrain() 方法时才会真正执行。也说就是,执行任务的与调用 waitAndDrain() 方法的是同一个线程。

那为什么会有 ThreadlessExecutor 这个实现呢?这主要是因为在 Dubbo 2.7.5 版本之前,在 WrappedChannelHandler 中会为每个连接启动一个线程池。

老版本中没有 ExecutorRepository 的概念,不会根据 URL 复用同一个线程池,而是通过 SPI 找到 ThreadPool 实现创建新线程池。

ThreadlessExecutor 的核心实现

首先是 ThreadlessExecutor 的核心字段,有如下几个。

ThreadlessExecutor 的核心逻辑在 execute() 方法和 waitAndDrain() 方法。execute() 方法相对简单,它会根据 waiting 状态决定任务提交到哪里,相关示例代码如下:

public class ThreadlessExecutor extends AbstractExecutorService {

    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    
    private ExecutorService sharedExecutor;
    
    private volatile boolean waiting = true;
    
    @Override
    public void execute(Runnable runnable) {
        runnable = new RunnableWrapper(runnable);
        synchronized (lock) {
            // 判断业务线程是否还在等待响应结果
            if (!waiting) {
                // 不等待,则直接交给共享线程池处理任务
                sharedExecutor.execute(runnable);
            } else {
                // 业务线程还在等待,则将任务写入队列,然后由业务线程自己执行
                queue.add(runnable);
            }
        }
    }
}

waitAndDrain() 方法中首先会检测 finished 字段值,然后获取阻塞队列中的全部任务并执行,执行完成之后会修改finished和 waiting 字段,标识当前 ThreadlessExecutor 已使用完毕,无业务线程等待。

public class ThreadlessExecutor extends AbstractExecutorService {

    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    
    private ExecutorService sharedExecutor;
    
    private volatile boolean waiting = true;
    
    private boolean finished = false;
    
    public void waitAndDrain() throws InterruptedException {
        // 检测当前ThreadlessExecutor状态
        if (finished) {
            return;
        }
        
        // 获取阻塞队列中获取任务
        Runnable runnable = queue.take();

        synchronized (lock) {
            // 修改waiting状态
            waiting = false;
            // 执行任务
            runnable.run();
        }

        // 如果阻塞队列中还有其他任务,也需要一并执行
        runnable = queue.poll();
        while (runnable != null) {
            runnable.run();
            runnable = queue.poll();
        }
        // mark the status of ThreadlessExecutor as finished.
        // 修改finished状态
        finished = true;
    }
}

到此为止,Transporter 层对 ChannelHandler 的实现就介绍完了,其中涉及了多个 ChannelHandler 的装饰器,为了更好地理解,这里回到 NettyServer 中,看看它是如何对上层 ChannelHandler 进行封装的。

在 NettyServer 的构造方法中会调用 ChannelHandlers.wrap() 方法对传入的 ChannelHandler 对象进行修饰:

public class ChannelHandlers {

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}
上一篇下一篇

猜你喜欢

热点阅读