Dubbo 服务调用 源码学习(下)(七)
笔记简述
本学习笔记接上篇Dubbo 服务调用 源码(上)学习(六),上一篇已经完成了invoker的生成,接下来就是具体的方法调用了,包含了mock测试、负载均衡(不涉及细节)、重试、netty调用、以及最后的结果等待和超时检测等几个步骤,依次操作,完成远程请求并获取结果的全过程操作。
更多内容可看[目录]Dubbo 源码学习
目录
Dubbo 服务调用 源码(下)学习(七)
1、InvokerInvocationHandler 入口
2、MockClusterInvoker mock入口
3、AbstractClusterInvoker 负载均衡
4、FailoverClusterInvoker 重试机制
5、DubboInvoker invoke
6、NettyChannel netty请求
7、Future 结果处理 & 超时检测
根据动态代理的认识,最后反射执行的方法肯定到InvokerInvocationHandler类的invoke方法中
1、InvokerInvocationHandler 入口
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// 获取方法的名称以及方法的参数信息
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// 此时invoker是MockClusterInvoker
// 还拼接生成了一个RpcInvocation
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
来到了MockClusterInvoker类,此时需要注意到MockClusterInvoker类的invoke是FailoverClusterInvoker
FailoverClusterInvoker类可以进行重试操作,如果有印象的可以知道在一个reference的xml配置中,可以加上重试次数retries属性字段的值,默认是3次,如果设置了小于0的数字,则为1次,重试次数0位的意思就是只进行一次操作
2、MockClusterInvoker mock入口
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// 注册中心,服务提供方、服务调用方的信息都存储在directory中,后期均衡负责也是处理这里面的数据
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
// 就是查看 `methodName.mock`或者`mock`的属性值,默认是“false”
if (value.length() == 0 || value.equalsIgnoreCase("false")){
// 不需要走Mock测试,进入到FailoverClusterInvoker中
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
}catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
}
return result;
}
3、AbstractClusterInvoker 负载均衡
进入到FailoverClusterInvoker类之前先进入到AbstractClusterInvoker类中
public Result invoke(final Invocation invocation) throws RpcException {
checkWheatherDestoried();
LoadBalance loadbalance;
List<Invoker<T>> invokers = list(invocation);
// 筛选出合适的invokers列表,基于方法和路由信息
// 其中路由信息则是通过MockInvokersSelector类处理获取到invocation中attachments保存的mock信息去筛选合适的invoker,所以重点是筛选
// 调试发现,一般情况下在这里面attachments字段并没有数据
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
// 创建合适的均衡负责类loanbalance信息,一般情况是RandomLoadBalance类
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 确保幂等,如果是异步则需要往attachment参数中添加自增ID(这个自增ID是AtomicLong类,线程安全)
// 这里就有往invocation的attachment填充数据的操作
return doInvoke(invocation, invokers, loadbalance);
// 现在进入到FailoverClusterInvoker类中了
}
4、FailoverClusterInvoker 重试机制
上面已经说了,这个类的主要作用是重试操作
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 第一个是执行的参数信息,包含了函数名等信息
// 第二个是被调用执行的参数,包含了服务提供方的IP:PORT信息
// 第三个是均衡负责,在选择调用的服务方时,会根据该对象选择一个合适的服务方
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
// 检测invokers是否存在,如果不存在则提示没有可用的服务提供方被使用,请检查服务提供方是否被注册
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
// 获取重试的次数,如果设置的值<=0,则只有1次操作机会,默认是3次
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//重试时,进行重新选择,避免重试时invoker列表已发生变化.
//注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
if (i > 0) {
checkWheatherDestoried();
copyinvokers = list(invocation);
//重新检查一下
// 注意一下这个list操作,这个list操作是重新更新可用的invoker列表
checkInvokers(copyinvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 选择合适的服务提供方的invoker,在AbstractClusterInvoker类中去完成均衡负责的选择操作
// 关于均衡负责,后面考虑分为一篇笔记去学习几种不同的负载方法,其中还包含了sticky 粘性连接
invoked.add(invoker);
RpcContext.getContext().setInvokers((List)invoked);
try {
Result result = invoker.invoke(invocation);
// 这一步才是真正的执行调用远程方法的开始&入口
if (le != null && logger.isWarnEnabled()) {
// 存在重试了3次才终于成功的情况,这时候会告警提醒之前存在的错误信息输出
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
// 遇到了RPCEXCEPTION 而且是biz类型的,则不重试直接抛出该异常
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 重试多次依旧没有正常的结果返回,则抛出该异常
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}
5、DubboInvoker invoke
上面说的Result result = invoker.invoke(invocation);
,经过层层转发,来到了FutureFilter类
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
// 看是否为异步的方法,添加有sync字段信息
// 先从invocation的attachment中查看是否存在async字段,再看看url中的methodName.async ,再看看url的async属性
fireInvokeCallback(invoker, invocation);
Result result = invoker.invoke(invocation);
// 进一步invoke操作
if (isAsync) {
asyncCallback(invoker, invocation);
} else {
syncCallback(invoker, invocation, result);
}
return result;
}
来到了MonitorFilter过滤器查看是否需要进行监控(通过查看url是否存在monitor字段,如果为true,则是需要监控)
再来到了AbstractInvoker类的invoke方法,本身是DubboInvoker
public Result invoke(Invocation inv) throws RpcException {
if(destroyed) {
throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ " is DESTROYED, can not be invoked any more!");
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
// 添加attachment信息,调试中发现添加的是interface和token
}
Map<String, String> context = RpcContext.getContext().getAttachments();
// 这个是利用了ThreadLocal持有的数据中获取
if (context != null) {
// 这代码写的冗余了,而且为啥不再加个empty的检测呢?
invocation.addAttachmentsIfAbsent(context);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
// 如果是异步的方法,添加async字段,
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 如果是异步则添加自增ID
try {
return doInvoke(invocation);
// 进入到DubboInvoker执行invoke操作了
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
DubboInvoker 类
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
// 添加路径和版本概念,如果没有添加则是0.0.0
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
// currentClient 是 后续需要连接netty操作的客户端
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
// 是否为异步操作。。。。为啥确认个异步操作这么多重复操作
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 是否设置了return=false 这个操作
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
// 超时设置的时间,默认为1s
if (isOneway) {
// 如果强制设置了return=false,异步的future都不需要设置了,也不需要关注超时字段
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
// 调用的是request方法,异步的设置future
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
// 同步方法,设置超时时间,等待返回
// 其实也是异步方法,只是最后调用了get去获取future的结果
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
6、NettyChannel netty请求
上述的request以及send方法,都被转发到HeaderExchangeChannel类中,这个类有一个非常关键的字段是channel,是NettyClient类,包含了服务提供方的IP:PORT信息
其实仔细看request方法和send方法最后的实现差不太多,只是request需要检测连接的channel是否存在,而send单独本身是不需要进行这个操作的。
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
// 生成的req有个线程安全的自增的ID,可以通过这个统计出调用的次数
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
channel.send(req);
// 进入到NettyChannel类中
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
// 返回future,后续的超时就是通过对future操作
}
NettyChannel 类
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.write(message);
// 这个就是调用的netty的write操作完成数据发送操作
// 这个就是经过层层嵌套包装向外发送数据的最终操作
if (sent) {
// url配置的send字段属性,如果为true
// 则通过await等待超时的世界去查看请求是否成功
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
// 抛出远程发送消息失败的错误,打印出发送参数以及远程IP
}
if(! success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
7、Future 结果处理 & 超时检测
看看异步拿到结果,判断是否超时等检测操作
DefaultFuture 类
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (! isDone()) {
// 这个时候还是异步执行的,会立即执行到这里(时间非常的端,相比RPC的几百毫秒而言)
long start = System.currentTimeMillis();
lock.lock();
try {
while (! isDone()) {
// 时刻观察是否拿到response
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
// 如果拿到结果或者超时了,跳出循环
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (! isDone()) {
// 这个时候还没拿到结果,肯定是认为超时了,抛出TimeoutException
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
// 拿到的结果是无效的
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
// 这才是真的调用成功,返回数据了
return res.getResult();
}
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
// 客户端超时或者服务端超时,抛出TimeoutException
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
// 其他的就抛出RemotingException异常,并从res获取错误原因
throw new RemotingException(channel, res.getErrorMessage());
}
至此整个的远程调用就全部结束了