Dubbo 服务调用 源码学习(下)(七)
本学习笔记接上篇Dubbo 服务调用 源码(上)学习(六),上一篇已经完成了invoker的生成,接下来就是具体的方法调用了,包含了mock测试、负载均衡(不涉及细节)、重试、netty调用、以及最后的结果等待和超时检测等几个步骤,依次操作,完成远程请求并获取结果的全过程操作。
1、InvokerInvocationHandler 入口
2、MockClusterInvoker mock入口
3、AbstractClusterInvoker 负载均衡
4、FailoverClusterInvoker 重试机制
5、DubboInvoker invoke
6、NettyChannel netty请求
7、Future 结果处理 & 超时检测
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 获取方法的名称以及方法的参数信息
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
// 此时invoker是MockClusterInvoker
// 还拼接生成了一个RpcInvocation
return invoker.invoke(new RpcInvocation(method, args)).recreate();
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// 注册中心,服务提供方、服务调用方的信息都存储在directory中,后期均衡负责也是处理这里面的数据
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
// 就是查看 `methodName.mock`或者`mock`的属性值,默认是“false”
if (value.length() == 0 || value.equalsIgnoreCase("false")){
// 不需要走Mock测试,进入到FailoverClusterInvoker中
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
try {
result = this.invoker.invoke(invocation);
}catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
result = doMockInvoke(invocation, e);
return result;
public Result invoke(final Invocation invocation) throws RpcException {
LoadBalance loadbalance;
List<Invoker<T>> invokers = list(invocation);
// 筛选出合适的invokers列表,基于方法和路由信息
// 其中路由信息则是通过MockInvokersSelector类处理获取到invocation中attachments保存的mock信息去筛选合适的invoker,所以重点是筛选
// 调试发现,一般情况下在这里面attachments字段并没有数据
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
// 创建合适的均衡负责类loanbalance信息,一般情况是RandomLoadBalance类
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 确保幂等,如果是异步则需要往attachment参数中添加自增ID(这个自增ID是AtomicLong类,线程安全)
// 这里就有往invocation的attachment填充数据的操作
return doInvoke(invocation, invokers, loadbalance);
// 现在进入到FailoverClusterInvoker类中了
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 第一个是执行的参数信息,包含了函数名等信息
// 第二个是被调用执行的参数,包含了服务提供方的IP:PORT信息
// 第三个是均衡负责,在选择调用的服务方时,会根据该对象选择一个合适的服务方
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
// 检测invokers是否存在,如果不存在则提示没有可用的服务提供方被使用,请检查服务提供方是否被注册
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
// 获取重试的次数,如果设置的值<=0,则只有1次操作机会,默认是3次
if (len <= 0) {
len = 1;
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
if (i > 0) {
copyinvokers = list(invocation);
// 注意一下这个list操作,这个list操作是重新更新可用的invoker列表
checkInvokers(copyinvokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 选择合适的服务提供方的invoker,在AbstractClusterInvoker类中去完成均衡负责的选择操作
// 关于均衡负责,后面考虑分为一篇笔记去学习几种不同的负载方法,其中还包含了sticky 粘性连接
try {
Result result = invoker.invoke(invocation);
// 这一步才是真正的执行调用远程方法的开始&入口
if (le != null && logger.isWarnEnabled()) {
// 存在重试了3次才终于成功的情况,这时候会告警提醒之前存在的错误信息输出
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
return result;
} catch (RpcException e) {
// 遇到了RPCEXCEPTION 而且是biz类型的,则不重试直接抛出该异常
if (e.isBiz()) { // biz exception.
throw e;
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
// 重试多次依旧没有正常的结果返回,则抛出该异常
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
上面说的Result result = invoker.invoke(invocation);
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
// 看是否为异步的方法,添加有sync字段信息
// 先从invocation的attachment中查看是否存在async字段,再看看url中的methodName.async ,再看看url的async属性
fireInvokeCallback(invoker, invocation);
Result result = invoker.invoke(invocation);
// 进一步invoke操作
if (isAsync) {
asyncCallback(invoker, invocation);
} else {
syncCallback(invoker, invocation, result);
return result;
public Result invoke(Invocation inv) throws RpcException {
if(destroyed) {
throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ " is DESTROYED, can not be invoked any more!");
RpcInvocation invocation = (RpcInvocation) inv;
if (attachment != null && attachment.size() > 0) {
// 添加attachment信息,调试中发现添加的是interface和token
Map<String, String> context = RpcContext.getContext().getAttachments();
// 这个是利用了ThreadLocal持有的数据中获取
if (context != null) {
// 这代码写的冗余了,而且为啥不再加个empty的检测呢?
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
// 如果是异步的方法,添加async字段,
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 如果是异步则添加自增ID
try {
return doInvoke(invocation);
// 进入到DubboInvoker执行invoke操作了
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
return new RpcResult(te);
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
} catch (Throwable e) {
return new RpcResult(e);
DubboInvoker 类
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
// 添加路径和版本概念,如果没有添加则是0.0.0
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
// currentClient 是 后续需要连接netty操作的客户端
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 是否为异步操作。。。。为啥确认个异步操作这么多重复操作
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 是否设置了return=false 这个操作
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
// 超时设置的时间,默认为1s
if (isOneway) {
// 如果强制设置了return=false,异步的future都不需要设置了,也不需要关注超时字段
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
// 调用的是request方法,异步的设置future
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
// 同步方法,设置超时时间,等待返回
// 其实也是异步方法,只是最后调用了get去获取future的结果
return (Result) currentClient.request(inv, timeout).get();
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
// create request.
Request req = new Request();
// 生成的req有个线程安全的自增的ID,可以通过这个统计出调用的次数
DefaultFuture future = new DefaultFuture(channel, req, timeout);
// 进入到NettyChannel类中
}catch (RemotingException e) {
throw e;
return future;
// 返回future,后续的超时就是通过对future操作
NettyChannel 类
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.write(message);
// 这个就是调用的netty的write操作完成数据发送操作
// 这个就是经过层层嵌套包装向外发送数据的最终操作
if (sent) {
// url配置的send字段属性,如果为true
// 则通过await等待超时的世界去查看请求是否成功
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
// 抛出远程发送消息失败的错误,打印出发送参数以及远程IP
if(! success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
DefaultFuture 类
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
if (! isDone()) {
// 这个时候还是异步执行的,会立即执行到这里(时间非常的端,相比RPC的几百毫秒而言)
long start = System.currentTimeMillis();
try {
while (! isDone()) {
// 时刻观察是否拿到response
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
// 如果拿到结果或者超时了,跳出循环
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (! isDone()) {
// 这个时候还没拿到结果,肯定是认为超时了,抛出TimeoutException
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
return returnFromResponse();
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
// 拿到的结果是无效的
throw new IllegalStateException("response cannot be null");
if (res.getStatus() == Response.OK) {
// 这才是真的调用成功,返回数据了
return res.getResult();
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
// 客户端超时或者服务端超时,抛出TimeoutException
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
// 其他的就抛出RemotingException异常,并从res获取错误原因
throw new RemotingException(channel, res.getErrorMessage());