Dubbo 单注册中心和多注册中心下的Invoker
2020-04-25 本文已影响0人
晴天哥_王志
开篇
- 这篇文章的目的主要是阐述单注册中心和多注册场景下Invoker的初始化以及调用过程。
服务引用过程
public class ReferenceConfig<T> extends AbstractReferenceConfig {
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
// 省略代码
if (isJvmRefer) {
// 省略代码
} else {
if (url != null && url.length() > 0) {
// 省略代码,忽略处理直连逻辑
} else { // 处理非直连来自注册中心的场景
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
}
if (urls.size() == 1) {
// 单注册中心的情况
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
// 多注册中心的情况
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
// 省略相关代码
// create service proxy
return (T) proxyFactory.getProxy(invoker);
}
}
- ReferenceConfig#createProxy内部根据注册中心的个数分别走不同的分支逻辑。
- 单注册中心情况下:urls.size() == 1,通过refprotocol.refer(interfaceClass, urls.get(0))来实现refer过程。
- 多注册中心情况下:通过refprotocol.refer(interfaceClass, url)来实现单个注册中心下的服务引用,通过invoker = cluster.join(new StaticDirectory(u, invokers))来实现不同注册中心invoker的聚合。
- 多注册中心和单注册中心的区别在于多注册中心存在二次聚合的join逻辑。
单注册中心下的ClusterInvoker
单注册中心- 单注册中心下每个Interface对应一个MockClusterInvoker,MockClusterInvoker包含FailoverClusterInvoker。
- MockClusterInvoker可以理解为FailoverClusterInvoker的装饰类。
- 每个FailoverClusterInvoker包含一个RegistryDirectory,RegistryDirectory的维度 = Interface + 注册中心,也就是每个注册中心+每个Interface=一个RegistryDirectory。
多注册中心下的ClusterInvoker
多注册中心- 多注册中心下的ClusterInvoker是每个注册中心下的ClusterInvoker的聚合。
- 外层聚合的ClusterInvoker为AvailableCluster。
- AvailableCluster的内部包含StaticDirectory,StaticDirectory包含所有注册中心下的MockClusterInvoker对象(MockClusterInvoker内部包含FailoverClusterInvoker),通过上图可以看到包含注册中心[127.0.0.1:2181]和注册中心[127.0.0.1:2182]下的Invoker对象。
AvailableCluster的Invoker的生成代码
invoker = cluster.join(new StaticDirectory(u, invokers));
public class AvailableCluster implements Cluster {
public static final String NAME = "available";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new AbstractClusterInvoker<T>(directory) {
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
};
}
}
public class StaticDirectory<T> extends AbstractDirectory<T> {
private final List<Invoker<T>> invokers;
public StaticDirectory(List<Invoker<T>> invokers) {
this(null, invokers, null);
}
public StaticDirectory(List<Invoker<T>> invokers, List<Router> routers) {
this(null, invokers, routers);
}
public StaticDirectory(URL url, List<Invoker<T>> invokers) {
this(url, invokers, null);
}
public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routers);
if (invokers == null || invokers.isEmpty())
throw new IllegalArgumentException("invokers == null");
this.invokers = invokers;
}
}
- AvailableCluster的join方法返回的是AbstractClusterInvoker对象,参数为StaticDirectory对象。
- StaticDirectory包含了Interface在每个注册中心下的invoker对象,即MockClusterInvoker对象(MockClusterInvoker内部包含FailoverClusterInvoker)。
多注册中心查找调用关系
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
protected final Directory<T> directory;
protected final boolean availablecheck;
public AbstractClusterInvoker(Directory<T> directory) {
this(directory, directory.getUrl());
}
public AbstractClusterInvoker(Directory<T> directory, URL url) {
if (directory == null)
throw new IllegalArgumentException("service directory == null");
this.directory = directory;
this.availablecheck = url.getParameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK);
}
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// 查找invokers
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 执行doInvoke动作
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
}
- AbstractClusterInvoker作为ClusterInvoker的基类,AbstractClusterInvoker#invoker封装了调用过程的逻辑,包括查找invoker过程和执行invoker过程。
- List<Invoker<T>> invokers = list(invocation)表示查找invoker过程,list方法内部通过doList返回对应的invoker对象,然后通过路由进行一层过滤。
- doInvoke(invocation, invokers, loadbalance)表示invoker的执行过程。
- 多注册中心场景下dubbo的调用关系分为两级查找,先从StaticDirectory当中查找,再从RegistryDirectory当中查找。
StaticDirectory查找和调用
public class StaticDirectory<T> extends AbstractDirectory<T> {
private final List<Invoker<T>> invokers;
public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routers);
this.invokers = invokers;
}
@Override
public boolean isAvailable() {
if (isDestroyed()) {
return false;
}
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return true;
}
}
return false;
}
@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
return invokers;
}
}
- StaticDirectory#doList内部只是简单的返回invokers对象。
- 通过上面的分析得知StaticDirectory的invokers包含Interface在所有注册中心下的MockClusterInvoker对象(MockClusterInvoker内部包含FailoverClusterInvoker),包含注册中心[127.0.0.1:2181]和注册中心[127.0.0.1:2182]下的FailoverClusterInvoker对象。
-
StaticDirectory的查找是以注册中心为维度进行查找的,返回Interface在单个注册中心下的FailoverClusterInvoker对象
public class AvailableCluster implements Cluster {
public static final String NAME = "available";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new AbstractClusterInvoker<T>(directory) {
@Override
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
};
}
}
- AvailableCluster#doInvoke先执行FailoverClusterInvoker#isAvailable方法判断可用性。
- AvailableCluster#doInvoke再执行FailoverClusterInvoker#invoke方法传递调用。
RegistryDirectory查找和调用
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
@Override
public List<Invoker<T>> doList(Invocation invocation) {
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
}
- RegistryDirectory中返回的是Interface在某个注册中心下的所有invoker对象。
- RegistryDirectory#invoker代表的是MockClusterInvoker,包含FailoverClusterInvoker。
- RegistryDirectory查找是以单个注册中心为维度查找,查找该Interface在单个注册中心下所有provider对应的invoker对象。
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
// list查找invokers
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 执行invoke调用
return doInvoke(invocation, invokers, loadbalance);
}
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance) throws RpcException;
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
// directory表示RegistryDirectory
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
}
- AbstractClusterInvoker#list返回该Interface在某个注册中心下所有provider对应的invoker对象。
- List<Invoker<T>> invokers = directory.list(invocation)执行的就是该查找过程。
RegistryDirectory
public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();
private final String serviceKey; // Initialization at construction time, assertion not null
private final Class<T> serviceType; // Initialization at construction time, assertion not null
private final Map<String, String> queryMap; // Initialization at construction time, assertion not null
private final URL directoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
private final String[] serviceMethods;
private final boolean multiGroup;
private Protocol protocol; // Initialization at the time of injection, the assertion is not null
private Registry registry; // Initialization at the time of injection, the assertion is not null
private volatile boolean forbidden = false;
private volatile URL overrideDirectoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
private volatile URL registeredConsumerUrl;
private volatile List<Configurator> configurators; // The initial value is null and the midway may be assigned to null, please use the local variable reference
private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
private volatile Map<String, List<Invoker<T>>> methodInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
// Set<invokerUrls> cache invokeUrls to invokers mapping.
private volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference
public RegistryDirectory(Class<T> serviceType, URL url) {
// serviceType表示com.alibaba.dubbo.demo.DemoService
// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=62794&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D62794%26qos.port%3D33333%26register.ip%3D192.168.0.8%26side%3Dconsumer%26timestamp%3D1587823349524×tamp=1587823349613
super(url);
this.serviceType = serviceType;
this.serviceKey = url.getServiceKey();
this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
String methods = queryMap.get(Constants.METHODS_KEY);
this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
}
public List<Invoker<T>> doList(Invocation invocation) {
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
}
public abstract class AbstractDirectory<T> implements Directory<T> {
private final URL url;
private volatile boolean destroyed = false;
private volatile URL consumerUrl;
private volatile List<Router> routers;
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
// doList由各子类实现具体的实现返回invokers
List<Invoker<T>> invokers = doList(invocation);
// 执行路由选择
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
}
}
}
return invokers;
}
}
- RegistryDirectory的含义需要特别解释下,希望重点关注了解下。
- RegistryDirectory对象代表的是Inteface服务在某个注册中心的对象,包含了Interface在注册中心所有的refer对象(即对应provider的服务引用对象)。
- Dubbo的整体设计是面向接口设计的,所有才会有Interface+注册中心唯一确定一个RegistryDirectory对象。
- RegistryDirectory的参数当中Class<T> serviceType代表服务的Interface, URL url代表注册中心的注册地址,例子见上述代码中的注释。
- AbstractDirectory#list内部执行查找和路由选择功能,所有通过AbstractDirectory#list的方法都会执行查询和路由选择步骤。
ClusterInvoker介绍
Invoker类图- 关于Cluster的介绍可以参考Dubbo Cluster介绍。