Dubbo之服务引用源码分析
流程图
这个流程对应我们这次源码分析主要内容,不得不说dubbo的文档写的太好了
时序图
引用服务两种方式
-
直连引用服务
-
从注册中心发现服务
经过debug,这边refer带的参数和实际有出入,具体看下面的解析
一些概念
Directory
主要用于获取Invoker
public interface Directory<T> extends Node {
//获取当前Directory对应的接口
Class<T> getInterface();
//根据invocaiton获取对应的Invoker
List<Invoker<T>> list(Invocation invocation) throws RpcException;
}
这个接口不是扩展点,具体实现有StaticDirectory,RegistryDirectory
StaticDirectory从名字看出来是静态的,就是说需要手动对里面invoker进行增减
而RegistryDirectory对注册中心目录增加了监听,里面的invoker会随着提供者的改变而变化
LoadBlance
用于选择调用的invoker
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
//根据不同LoadBalance算法,从invokers中选择出一个合适的invoker
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
实现有
random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
分别对应带权随机,带权轮询,最少活跃数,一致性hash算法
Cluster
集群功能,当有多个Invoker时,会把它们伪装成一个Invoker,提供一些集群调用方式
@SPI(FailoverCluster.NAME)
public interface Cluster {
//将多个Invoker伪装成一个Invoker,Invoker从directory获取
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}
实现有
//在配置mock参数配置之后生效,用于服务降级
mock=com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
//失败自动切换,当出现失败,重试其它服务器 。通常用于读操作,但重试会带来更长延迟。
可通过 retries="2" 来设置重试次数(不含第一次)。
failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster
//失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
failfast=com.alibaba.dubbo.rpc.cluster.support.FailfastCluster
//失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
failsafe=com.alibaba.dubbo.rpc.cluster.support.FailsafeCluster
//失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
failback=com.alibaba.dubbo.rpc.cluster.support.FailbackCluster
//并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪
费更多服务资源。可通过 forks="2" 来设置最大并行数。
forking=com.alibaba.dubbo.rpc.cluster.support.ForkingCluster
//可用性调用,调用最先可用的invoker
available=com.alibaba.dubbo.rpc.cluster.support.AvailableCluster
//合并多个调用结果的cluster
mergeable=com.alibaba.dubbo.rpc.cluster.support.MergeableCluster
//广播调用所有提供者,逐个调用,任意一台报错则报错 。通常用于通知所有提供者更新缓存
或日志等本地资源信息。
broadcast=com.alibaba.dubbo.rpc.cluster.support.BroadcastCluster
源码分析
解析配置
我们一般引用服务的时候,会配置
<dubbo:reference id="bidService" interface="com.alibaba.dubbo.demo.bid.BidService"/>
这个标签会被DubboNamespaceHandler解析为ReferenceBean
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
那么ReferenceBean又是怎么生成com.alibaba.dubbo.demo.bid.BidService类型的代理放到spring容器中的呢?答案是用到了FactoryBean
看下FactoryBean的定义
public interface FactoryBean<T> {
//返回getObjectType方法对应的对象
T getObject() throws Exception;
//返回FactoryBean的类型
Class<?> getObjectType();
boolean isSingleton();
}
实现FactoryBean的接口,可以生产一些其他类型的Bean到Spring容器,类型由getObjectType方法控制,返回对象由getObject方法得到
ReferenceConfig实现了这个接口,那么在获取BidService类型bean的时候,会调用ReferenceConfig的getObject方法来获得
在getObject方法中我们会返回ReferenceConfig中的ref属性,在返回之前会通过init方法先对ref进行初始化,ref其实就是一个代理对象,内部封装了invoker的调用。这个init方法的作用主要是获取invokers,通过cluster伪装成一个invoker,并且把invoker转换为代理对象ref
获取invoker
获取invoker以及创建代理对逻辑全在ReferenceConfig的createProxy中
首先我们会判断我们需要的服务在InjvmProtocol是否存在以及可调用
//根据之前解析的参数,构造一个本地jvm调用的url
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
//是否配置injvm参数
if (isInjvm() == null) {
if (url != null && url.length() > 0) { //配置直连URL的情况下,不做本地引用
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
//默认情况下如果本地有服务暴露,则引用本地服务.
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
}
如果我们不强制指定injvm参数等于false,如果InjvmProtocol暴露了这个服务,消费者默认会使用本地的
如果不调用InjvmProtocol,那么通过远程协议得到invoker
首先会对url进行处理
if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // 通过注册中心配置拼装URL
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
如果配置了直连url,因为可以配置多个,用分割符分成多个后,可能会存在registry协议的url,会对registry协议url做一些特殊处理,会在refer参数内加上之前保存的一些调用接口的配置键值对
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&organization=dubbox&owner=programmer&pid=34788&refer=application%3Ddemo-consumer%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.bid.BidService%26methods%3DthrowNPE%2Cbid%26organization%3Ddubbox%26owner%3Dprogrammer%26pid%3D34788%26side%3Dconsumer%26timestamp%3D1522551195255®istry=zookeeper×tamp=1522551197254
而不是registry协议的直连url,通过ClusterUtils.mergeUrl进行参数合并后,生成的url就直接对应到服务提供者,如
127.0.0.1:20880/com.alibaba.dubbo.demo.bid.BidService?application=demo-consumer&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.bid.BidService&methods=throwNPE,bid&organization=dubbox&owner=programmer&pid=34828&side=consumer×tamp=1522552227828
这边因为我配置的直连url="127.0.0.1:20880",没有配置协议,所以产生的提供者url没有协议,但是protcol的适配类会自动使用dubbo协议,如果提供者不是用dubbo协议暴露的,那么就存在问题了。
如果没有配置直连url,那么获取注册中心的url,并且在refer参数的放入调用接口的配置键值对,和上面第一个url一致
完成解析url之后,就可以通过protocol的refer方法,把url转换成invoker
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL
// 对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
如果url只存在一个,那么直接用protocol进行转换
如果存在多个,会先通过urls获取所有invoker,然后根据urls中是否存在registry协议的url,做不同的集群调用
- urls中存在注册中心url
强制会使用AvailableCluster调用,因为一部分是直连的invoker,一部分是registry协议生成的invoker,registry协议生成的invoker内部也是多个invoker的cluster调用,如果在外层还允许使用其他复杂的cluster模式,我认为会加大调用复杂度,所以这个外层的cluster调用,是哪个invoker优先可用就用谁 - urls中不存在注册中心url
对于urls全是直连的url,那么直接使用配置的cluster模式把多个invoker伪装成一个即可
下面看下RegistryProtocol和DubboProtocol如何通过refer方法把url转换为invoker
RegistryProtocol的refer
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//通过url获取注册中心对象
Registry registry = registryFactory.getRegistry(url);
//如果远程调用的接口就是RegistryService,直接返回,暂时不知道这个被什么功能调用
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
//提取refer内的参数
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
//如果配置了group,调用对应group的提供者
if (group != null && group.length() > 0 ) {
if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
|| "*".equals( group ) ) {
//MergeableCluster会根据merge参数是否配置,进行结果合并
return doRefer( getMergeableCluster(), registry, type, url );
}
}
//这边的cluster是适配类,会根据url内配置的cluster参数选择集群策略
return doRefer(cluster, registry, type, url);
}
在doRefer方法里,通过type和url初始化RegistryDirectory,RegistryDirectory内部会通过url从Registry获取所有提供者url并通过对应protocol创建invoker
同时把RegistryDirectory设置到cluster,cluster会调用RegistryDirectory的doList方法获取对应invoker,伪装成一个invoker,然后根据不同集群实现进行特定的调用
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//配置Directory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//注册监听回调,用于invoker动态更新 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
//从directory获取invokers,对外封装成一个invoker
return cluster.join(directory);
}
DubboProtocol的refer
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// modified by lishen
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
在DubboProtocol的refer方法根据url生成对应的DubboInvoker,DubboInvoker初始化的时候,会把netty客户端对象数组ExchangeClient传入,ExchangeClient根据url生成,会连接到对应到远程暴露服务器监听的端口,DubboInvoker会轮询client对server进行远程调用。调用逻辑在doInvoke方法
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
//直接调用,忽略返回信息,通过设置return=false来实现
if (isOneway) {
//sent参数用来设置是否需要等待消息发出再返回,在异步调用总是不等待返回
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {//异步调用,通过配置async=true开启,同时可以配置onreturn回调
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));//将future绑定到上下文,这个异步回调会在FutureFilter里面处理,同步调用设置的callback也会在FutureFilter处理
return new RpcResult();
} else {//同步调用
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
这边调用模式由三种,oneway,async,sync
oneway直接调用忽略结果可以配置sent
async异步调用
sync同步调用,阻塞返回结果
这边的集中调用方式都可以配置回调方法,回调的逻辑在FutureFIiter里面
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
//oninvoke回调
fireInvokeCallback(invoker, invocation);
// need to configure if there's return value before the invocation in order to help invoker to judge if it's
// necessary to return future.
Result result = invoker.invoke(invocation);
if (isAsync) {
//onthrow和onreturn回调
asyncCallback(invoker, invocation);
} else {
//
syncCallback(invoker, invocation, result);
}onthrow和onreturn回调
return result;
}
可以通过对方法配置onthrow,oninvoke,onreturn来设置回调
image.png
ExchangeClient调用远程提供者的逻辑单独再讲,和Server一起
创建代理对象
再拿到invoker之后,通过
proxyFactory.getProxy(invoker);
创建代理,和服务暴露都是用proxyFactory扩展点,但是服务引用用getProxy把invoker转换为代理ref,而在服务暴露中是把代理ref转换为Invoker
invoker转换为代理ref的逻辑有两部分,一部分在AbstractProxyFactory,另一部分通过模版方法让子类实现
先看AbstractProxyFactory中的逻辑
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
Class<?>[] interfaces = null;
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i ++) {
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[] {invoker.getInterface(), EchoService.class};
}
return getProxy(invoker, interfaces);
}
在AbstractProxyFactory的getProxy会在代理的接口中加入EchoService接口,也就是回声服务,使用方式如下
具体原理是在调用服务暴露的invoker时会有EchoFilter拦截这个调用
@Activate(group = Constants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
if(inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1 )
return new RpcResult(inv.getArguments()[0]);
return invoker.invoke(inv);
}
}
如果invoker可用,会把传过去的值原封不动返回过来
在增加EchoService接口后,通过子类的模版方法getProxy来创建代理
public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
我们看下JdkProxyFactory的实现
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
具体代理如何通过invoker实现调用封装在InvokerInvocationHandler里
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
//处理Object方法的调用,跟Object有关的方法都不需要远程调用
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
//执行远程调用
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
其中recreate方法用来将result转换为接口实际需要的类型,如果有异常抛出
public Object recreate() throws Throwable {
if (exception != null) {
throw exception;
}
return result;
}
现在把代理放到spring容器,用起来就想本地调用一样,其实也不是主动放,依赖注入的时候才主动初始化
接下去
Dubbo可以说复杂又简单,在引用和暴露中存在很多其他功能点,接下来需要一个个解析
- remoting模块解析
- 注册中心解析
- Protocol解析
- Cluster,Directory,LoadBalance解析
最后
希望大家关注下我的公众号
image