微服务架构和实践Dubbo剖析程序员

dubbo剖析:五 网络通信之 -- 请求发送与接收

2018-05-06  本文已影响544人  益文的圈

注:文章中使用的dubbo源码版本为2.5.4

零、文章目录

一、Consumer发送请求

1.1 代码入口

1.2 整体流程

Consumer发送请求流程图

1)代理执行(InvokerInvocationHandler.invoke):

2)集群容错+负载均衡(AbstractClusterInvoker.invoke):

3)Filter链扩展点(ProtocolFilterWrapper + ProtocolListenerWrapper):

4)调用协议层执行(AbstractInvoker.invoke):

5)交换层执行(ExchangeClient.request):

6)网络层执行(Client.send):

二、Provider接收请求并发送响应

2.1 代码入口

2.2 整体流程

接收请求流程图

1)Netty网络事件处理器(NettyHandler):

2)复合消息处理器(MultiMessageHandler):

    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }

3)心跳消息处理器(HeartbeatHandler):

4)业务线程转换处理器(AllChannelHandler):

public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

注意点:
a)线程池默认业务线程数为200
b)队列默认采用SynchronousQueue

5)业务解码处理器(DecodeHandler):

6)交换层请求响应处理器(HeaderExchangeHandler):

            if (message instanceof Request) {
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    //case a: 请求响应模型的请求处理
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } 
                    //case b: 单向消息接收的处理
                    else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                //case c: 请求响应模型的响应处理
                handleResponse(channel, (Response) message);
            }

a)请求响应模型的Request消息:调用ExchangeHandlerAdapter.reply()获取执行结果Result -->
将本地执行结果Result封装成RPC响应Response --> 通过channel.send()发送RPC响应;

    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        Object msg = req.getData();
        try {
            // 调用```ExchangeHandlerAdapter.reply()```获取执行结果```Result```
            Object result = handler.reply(channel, msg);
            res.setStatus(Response.OK);
            res.setResult(result);
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        //将本地执行结果```Result```封装成RPC响应```Response```
        return res;
    }

b)单向请求消息的处理:调用ExchangeHandlerAdapter.received()处理请求消息,如果该消息是Invocation则执行reply()逻辑但不主动发送RPC响应Response

        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

c)请求响应模型的Response消息:调用DefaultFuture.received()处理响应消息。
...注:请求响应模型(Request,Response,DufaultFuture)相关后续专门分析,此处不展开...

7)真正本地实现类方法的执行(ExchangeHandlerAdapter):

三、Consumer接收响应

整体流程与 “Provider接收请求” 一样,唯一的区别是在 交换层请求响应处理器(HeaderExchangeHandler)步骤中会执行 “分支c:请求响应模型的Response消息”,将Response交由DefaultFuture处理。

上一篇 下一篇

猜你喜欢

热点阅读