Dubbo网络模型(居然肝了两周才写完,吐血推荐)

2021-04-19  本文已影响0人  Java技术进阶

本篇为dubbo高性能剖析的下篇,侧重于剖析dubbo在网络层的源码实现及设计。(上篇传送门dubbo的线程模型

概述

Dubbo中Consumer端和Provider端由于角色和职责不尽相同,本篇将分开介绍Consumer、Provider。希望借助示意图再加上源码的剖析,能让大家对Dubbo中的网络请求模型有个比较清晰的了解和认识。

Consumer端

Consumer端由于支持的场景非常多,直接给出代码分析大家可能会看的云里雾里。所以这里先给出示意图,从整体上来了解请求模型有哪些,并通过简单的示例对各个模型有个简单的认识。然后再开始深入剖析源码实现。

我们先来了解下consumer端请求模型有哪些及其特点:

交互示意图如下所示:

net_model_01.png

为了进一步区分异步请求阻塞get()与回调的区别,我们通过代码示例进一步说明

接口契约如下:

public interface DemoService {

    String sayHello(String name);

    default CompletableFuture<String> sayHelloAsync(String name) {
        return CompletableFuture.completedFuture(sayHello(name));
    }

}

服务提供方实现:

public class DemoServiceImpl implements DemoService {
    private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);

    @Override
    public String sayHello(String name) {
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
        return "Hello " + name + ";
    }

}

调用方代码:

DemoService demoService = ReferenceConfigCache.getCache().get(reference);

CompletableFuture<String> future = demoService.sayHelloAsync("get");
System.err.println("threadName:" + Thread.currentThread().getName() + " get 开始");
future.get();
System.err.println("threadName:" + Thread.currentThread().getName() + " get 结束");


System.err.println("threadName:" + Thread.currentThread().getName() + " callback 开始");
demoService.sayHelloAsync("callback").whenComplete((r, e) -> {
  System.err.println("threadName:" + Thread.currentThread().getName() + " callback 结束");
});

System.err.println("threadName:" + Thread.currentThread().getName() + " 执行完毕");

代码中在关键的节点打印了当前线程名,依次调用了Future.getCompletableFuture.whenComplete,控制台输出结果为:

threadName:main get 开始
threadName:main get 结束
threadName:main callback 开始
threadName:main 执行完毕
    
threadName:DubboClientHandler-10.0.107.214:20880-thread-1 callback 结束

通过执行结果可以发现:通过Future.get()'会阻塞当前线程等待结果,而CompletableFuture`的回调则使用单独的线程不会阻塞当前线程的执行。

Consumer关键源码分析

上面概述了Consumer中存在的几种网络模型,本小节我们侧重于源码分析,剖析上面几种请求模型实现原理。

protected Result doInvoke(final Invocation invocation) throws Throwable {
    // 省略部分代码
    try {
        // 如果是单向请求
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        if (isOneway) {
            // 是否确认异步发送
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {
            // 如果是同步使用ThreadlessExecutor,其他场景使用Dubbo创建的共享线程池
            ExecutorService executor = getCallbackExecutor(getUrl(), inv);
            // 创建CompletableFuture,并通过netty work线程池将消息异步发送出去
            CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(inv, timeout, executor).thenApply(o -> (AppResponse) o);
            // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
            FutureContext.getContext().setCompatibleFuture(appResponseFuture);
            AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
            // 设置执行异步回调任务的线程池为executor
            result.setExecutor(executor);
            return result;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

可以看到整体上的代码逻辑有两个分支:处理单向请求和非单向请求,那么我们下面就依次展开两种场景具体分析。

单向请求

1. boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
2. currentClient.send(inv, isSent);
3. return AsyncRpcResult.newDefaultAsyncResult(invocation);

单向请求代码很简单只有三行就完成了。第1行代码出现的isSent参数表示线程是否等待确认异步发送。因为消息最终还是通过Netty NIO异步发送出去的,如果设置为true意味着要一直等待至Netty发送完成,默认为false。第2行代码跟踪消息发送过程,调用链如下:

-ReferenceCountExchangeClient
--HeaderExchangeClient
---HeaderExchangeChannel
----NettyChannel

最终发送消息核心代码如下:

1. public void send(Object message, boolean sent) throws RemotingException {
    // 省略非关键代码
  
    // 这里通过调用Netty方法将消息发送出去
2.     ChannelFuture future = channel.writeAndFlush(message);
    // 如果方法配置sent=true则需要同步等待消息发送完毕
3.     if (sent) {
      // wait timeout ms
4.     timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
5.     success = future.await(timeout);
    }
   // 省略非关键代码
}

在第2行可以看到消息通过Netty中的channel将消息异步发送出去,返回了一个Future对象,如果isSent设置为true那么在执行到第5行时就会阻塞至消息发送成功或者超时后返回。

方法send执行结束后程序会返回主流程的doInvoke方法继续执行到AsyncRpcResult.newDefaultAsyncResult(invocation);,这里创建默认返回值后就完成了单向请求全部过程。

除了单向请求,其他另外三种都是双向的,有去有回。一个是请求报文的发送,一个是响应报文的接收,两者需要结合起来一起合作才能完成一个RPC的全部过程。那么后续针对其他三种请求模型代码分析时我们也按照先请求后响应的步骤展开。

同步请求

即客户端请求之后当前线程会阻塞等待响应,处理请求和响应的同一个线程。接下来我们重点分析以下实现:

下面代码是非单向请求的处理逻辑

// 如果是同步使用ThreadlessExecutor,其他场景使用Dubbo创建的共享线程池
1. ExecutorService executor = getCallbackExecutor(getUrl(), inv);
// 创建CompletableFuture,并通过netty work线程池将消息异步发送出去
2. CompletableFuture<AppResponse> appResponseFuture =
        currentClient.request(inv, timeout, executor).thenApply(o -> (AppResponse) o);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
// 设置执行异步回调任务的线程池为executor
result.setExecutor(executor);
return result;

第1行代码:当前执行的请求通过getCallbackExecutor获得对应的线程池(用来执行返回请求时异步回调过程),代码如下:

protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
    ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
    // 如果当前请求是同步请求,则返回ThreadlessExecutor
    if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
        return new ThreadlessExecutor(sharedExecutor);
    } else {
        // 否则返回客户端创建的共享线程池
        return sharedExecutor;
    }
}

关于ThreadlessExecutor有必要介绍下它的特点,官方文档说明如下:

补充说明,对理解同步请求阻塞原理非常重要:在ThreadlessExecutor中任务使用阻塞队列存储,如果队列是空的情况下直接调用waitAndDrain()方法时,阻塞队列会阻塞当前线程直至有新的任务到来。Dubbo充分利用阻塞队列的特性,在请求完成之后调用waitAndDrain()方法阻塞住当前线程,当Netty的work线程收到响应之后会将消息转换成待处理任务加入到该线程池中。此时刚刚因为没有任务处理而阻塞的线程会被唤醒,用来继续处理响应的消息。理解这段话我们来继续分析请求代码:

第2行代码:currentClient调用request方法发送请求并返回CompletableFuture,进一步跟踪request源码:

public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // 创建请求,这里隐藏了构造方法中创建请求id的细节,这也是关联request和response的关键。
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    // 创建Future对象,将请求以及线程池保存到Future中
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

这段代码主要完成了三件事:

返回future对象之后,对象会被进一步封装成AsyncRpcResult,到这里貌似主流程都结束了并没有看到调用``waitAndDrain()方法产生阻塞关键代码。Dubbo将这块代码封装到了AsyncToSyncInvoker`中:

@Override
public Result invoke(Invocation invocation) throws RpcException {
 1.   Result asyncResult = invoker.invoke(invocation);

    try {
 2.       if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            /**
             * NOTICE!
             * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
             * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
             */
  3.          asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    // 省略非关键代码
}

可以看到第2行判断是否为同步请求,如果是同步请求会调用第3行的get()方法,而该方法跟进去之后如下:

public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
1.   if (executor != null && executor instanceof ThreadlessExecutor) {
        ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
2.        threadlessExecutor.waitAndDrain();
    }
    return responseFuture.get(timeout, unit);
}

这里可以看到第一行判断线程池是否为ThreadlessExecutor,如果是则会调用waitAndDrain()阻塞等待,到这里同步请求的过程我们已经分析完毕,接下来我们来分析同步请求的响应区里过程。

还记上篇中出现的AllChannelHandler吗,这个类将部分IO事件委派给Dubbo中的线程池处理,其中就包括了消息的接收,我们一起来看下源码:

@Override
public void received(Channel channel, Object message) throws RemotingException {
1.    ExecutorService executor = getPreferredExecutorService(message);
    try {
2.        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
       if(message instanceof Request && t instanceof RejectedExecutionException){
            sendFeedback(channel, (Request) message, t);
            return;
       }
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
}

第1行通过返回的消息获取线程池,第2行将消息进一步封装成任务交给刚刚线程池处理。我们进一步跟进获取线程池的代码,剖析Dubbo是如何通过响应找到请求的上下文的。

public ExecutorService getPreferredExecutorService(Object msg) {
1.   if (msg instanceof Response) {
2.       Response response = (Response) msg;
3.       DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
        // a typical scenario is the response returned after timeout, the timeout response may has completed the future
4.       if (responseFuture == null) {
5.            return getSharedExecutorService();
6.        } else {
7.            ExecutorService executor = responseFuture.getExecutor();
8.            if (executor == null || executor.isShutdown()) {
9.                executor = getSharedExecutorService();
            }
            return executor;
        }
    } else {
        return getSharedExecutorService();
    }
}

在第3行,通过响应消息携带回来的id就可以在DefaultFuture的静态全局变量Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();中获得关联请求的DefaultFuture了,进一步也就可以获取到该DefaultFuture中存储的线程池ThreadlessExecutor了。那么任务被Netty中的工作线程调用executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));被加入阻塞队列后又会发生什么呢,我们来分析下waitAndDrain()中的源码:

public void waitAndDrain() throws InterruptedException {
    
    if (finished) {
        return;
    }

1.    Runnable runnable = queue.take();

2.    synchronized (lock) {
3.        waiting = false;
4.        runnable.run();
    }

    runnable = queue.poll();
    while (runnable != null) {
        try {
            runnable.run();
        } catch (Throwable t) {
            logger.info(t);

        }
        runnable = queue.poll();
    }
    // mark the status of ThreadlessExecutor as finished.
    finished = true;
}

在请求时会调用一次waitAndDrain(),由于队列中是空的所以线程会阻塞在第1行代码。当收到响应消息时新的处理任务被添加进来,代码会继续执行走到第4行执行刚刚添加进来的任务。下面我们剖析任务执行过程,调用链如下:
ChannelEventRunnable.run()
-HeaderExchangeHandler.received()
--HeaderExchangeHandler.handleResponse()
---DefaultFuture.received()

public static void received(Channel channel, Response response, boolean timeout) {
        try {
1.            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                Timeout t = future.timeoutCheckTask;
                if (!timeout) {
                    // decrease Time
                    t.cancel();
                }
2.                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
}

第1行通过调用id获取future对象,第2行调用doReceived()方法,方法内容如下:

private void doReceived(Response res) {
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
1.            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }

        // the result is returning, but the caller thread may still waiting
        // to avoid endless waiting for whatever reason, notify caller thread to return.
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            if (threadlessExecutor.isWaiting()) {
                threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
                        " which is not an expected state, interrupt the thread manually by returning an exception."));
            }
        }
    }

这里关键代码就一行this.complete(res.getResult());。首先通过res.getResult()获取返回值,调用this.complete()并将返回值传入。如果该future注册了类如whenComplete()回调函数就会在此时触发。

异步请求

在本篇开始的时候我们就通过简单的示例演示了Dubbo中两种异步不同的使用方式,一种是通过get()主动阻塞获取返回值(是否阻塞要看get时消息是否返回了),另一种则是完全异步由其他的线程执行回调方法。

通过上面对同步的分析,假设AsyncToSyncInvokerasyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);这段代码不执行会是不是就可以直接返回了呢?

答案是肯定,没有了阻塞的过程,在请求消息发送之后就立刻返回了,忽略了这段代码就是异步请求了。同步请求与客户端采用future.get()这种方式获取结果集区别在于前者是框架内部阻塞,后者是客户端自己主动阻塞。同步阻塞是借助阻塞队列实现的,而future.get()是借助CompletableFuture.waitingGet实现的。

而采用whenComplete异步回调方式则不会阻塞当前线程,异步的回调是在响应结果到达之后,通过共享线程池中的线程执行this.complete(res.getResult());方法,来回调whenCompelete方法中的内容。

到这里异步请求的剖析也就结束了,可以看到Dubbo中同步请求和异步请求代码大部分都是一样的,但是通过巧妙的ThreadLessExecutor设计完成了异步转同步的操作,再借助于CompletableFuture中提供的异步特性实现了真正实现了请求异步。

Provider端

上面我们简单介绍了consumer端的几种请求模型,那么provider端有哪些响应模型呢?

一个请求过来时,在provider端大致要经过以下几个处理过程:

在上篇线程模型中我们介绍了Dubbo在默认策略下,请求的接收会由单独创建的线程池处理,而非Netty的工作线程处理。请求接收到之后,经过解析处理最终会通过代理类反射调用对应的方法并拿到结果,下面为调用方法的关键代码(位于AbstractProxyInvoker)。

public Result invoke(Invocation invocation) throws RpcException {
        try {
            // 反射调用方法获取结果
            System.err.println("threadName:" + Thread.currentThread().getName() + "发起了调用");
            Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            // 将返回结果封装成CompletableFuture
            CompletableFuture<Object> future = wrapWithFuture(value);
            // 将结果通过AppResponse封装
            CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
                AppResponse result = new AppResponse();
                if (t != null) {
                    if (t instanceof CompletionException) {
                        result.setException(t.getCause());
                    } else {
                        result.setException(t);
                    }
                } else {
                    result.setValue(obj);
                }
                return result;
            });
            // 返回支持异步结果
            return new AsyncRpcResult(appResponseFuture, invocation);
        } catch (InvocationTargetException e) {
            if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) {
                logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);
            }
            return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

以上调用方法代码会将结果统一封装为支持异步的CompletableFuture。如果被调用的方法本身不支持异步,那么在这里主线程即调用方法的都是一个线程。下面示例我们以sayHelloAsync方法为例演示服务端异步的场景:

@Override
public CompletableFuture<String> sayHelloAsync(String name) {
    return CompletableFuture.supplyAsync(() -> {
        System.err.println("threadName:" + Thread.currentThread().getName() + "执行了方法");
        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(3));
        return "Hello";
    });
}

在执行调用前以及方法内部分别打印了线程名称,输出如下内容:

threadName:DubboServerHandler-10.0.107.214:20880-thread-2发起了调用
threadName:ForkJoinPool.commonPool-worker-9执行了方法

可以明显看到两者是由不同线程执行的,而同步的输入如下的内容:

threadName:DubboServerHandler-10.0.107.214:20880-thread-2发起了调用
threadName:DubboServerHandler-10.0.107.214:20880-thread-2 执行了方法

可以看到调用以及方法的执行都是有同一个线程负责的。

上面我们分析了provider端请求接收即服务方法调用的过程,下面我们继续分析结果发送的过程(HeaderExchangeHandler)。

继上面反射调用之后获得返回的CompletableFuture,这里注册了回调方法whenComplete,方法将调用结果通过channel发送出去。

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
    // 省略无关代码
    // find handler by message class.
    Object msg = req.getData();
    try {
        // 反射调用方法
        CompletionStage<Object> future = handler.reply(channel, msg);
        future.whenComplete((appResult, t) -> {
            System.err.println("threadName:" + Thread.currentThread().getName() + "执行了回调");
            try {
                if (t == null) {
                    res.setStatus(Response.OK);
                    res.setResult(appResult);
                } else {
                    res.setStatus(Response.SERVICE_ERROR);
                    res.setErrorMessage(StringUtils.toString(t));
                }
                // 发送返回消息
                channel.send(res);
            } catch (RemotingException e) {
                logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
            }
        });
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
        channel.send(res);
    }
}

进一步追踪发送消息代码,调用链路如下:

-HeaderExchangeHandler
--HeaderExchangeChannel
---NettyChannel
----AbstractChannel
-----DefaultChannelPipeline
------AbstractChannelHandlerContext

最终关键代码如下:

private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
            executor.execute(runnable);
        } catch (Throwable cause) {
            try {
                promise.setFailure(cause);
            } finally {
                if (msg != null) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

这里出现的executor是Netty中的worker线程池,真正的消息响应发送的工作是由该组线程池来完成的。到这里消息响应的关键过程已经分析完毕,总结示意图如下所示:

net_model_02.png

在异步场景时,Dubbo Server Handler Thread 只负责服务调用,方法的执行由异步的ForkJoin Common Pool完成(业务线程),当然也可以手动指定线程池。当方法执行完毕后在回调方法中完成消息发送的方法触发,消息的发送则又由单独的Netty Work Thread来完成了。

在同步场景时,Dubbo Server Handler Thread 需要负责服务调用以及方法的执行,获取结构后则触发消息发送,具体的发送动作由单独的Netty Work Thread来完成,可见同步和异步的区别在于方法的执行是由谁来完成。

上一篇下一篇

猜你喜欢

热点阅读