9.集群容错
9.1 Cluster集群容错
我们可以把Cluster看作一个集群容错层,该层包含Cluster、Directory、Router、LoadBalance几大核心接口。这里要注意区分Cluster层和Cluster接口,Cluster层是个抽象概念,白噢是对外的整个集群容错层;Cluster接口是个容错接口,提供Failover、Failfast等容错策略。
Cluster层的总体工作流程可以分为一下几步:
(1) 生成Invoker对象。不同的Cluster实现会生成不同类型的ClusterInvoker对象并返回,然后调用ClusterInvoker的invoke方法,正式开始调用流程。
(2) 获取可调用的服务列表。首先会做一些前置校验,检查远程服务是否可用,然后通过Directory#list方法获取所有可用的服务列表。接着使用Router接口处理该服务列表,根据路由规则过滤一部分服务,最终返回剩余的服务列表。
(3) 负载均衡。在第(2)步中得到的服务列表还需要通过不同的负载均衡策略选出一个服务,用作最后的调用。首先框架会根据用户的配置,调用ExtensionLoader获取不同负载均衡策略的扩展点实现,然后做一些后置操作,如果是异步调用则设置调用编号,接着调用子类实现的doInvoke方法,子类会根据具体的负载均衡策略选出一个可以调用的服务。
(4) RPC调用。首先保存每次调用的Invoker到RPC上下文,并做RPC调用,然后处理调用结果,对于调用出现异常、成功、失败等情况,每种容错策略都会有不同的处理方式。
总体调用流程
上图是一个全量的调用路程,其中1~3步都是在抽象方法AbstractClusterInvoker中实现的,可以理解为通用的模板流程,主要做了校验、参数准备等工作,最终调用子类实现的doInvoke方法。
9.2
Cluster接口一共有9中不同的实现,每种实现分别对应不同的ClusterInvoker。
9.2.1 容错机制概述
容错机制能增强整个赢得鲁棒性,容错过程对上层用户是完全透明的,但用户可以通过不同的配置项来选择不同的容错机制。以下是Dubbo提供的9中容错机制:
- Failover 当出现失败时,会重试其他服务器,用户可以通过retries="2"设置重试次数。这是Dubb默认的容错机制,会对请求做负载均衡。通常使用在读操作或者幂等的写操作(编辑和删除都是幂等,创建不是)上。但要注意重试会导致接口的延迟增大,下游机器负载较高时,重试容易加重下游服务的负载。
- Failfast 快速失败,当请求失败后,快速返回异常结果,不做任何重试。该容错机制同样会对请求做负载均衡,通常使用在非幂等接口的调用上。该机制收到网络抖动影响较大。
- Failsafe 当出现异常时,直接忽略异常。会对请求做负载均衡。通常用于一些不那么重要的应用场景,比如一些不重要的日志同步。
- Failback 请求失败后,会自动记录在失败队列中,并由一个定时线程池定时重试,适用于一些异步或最终一致性的请求,请求会做负载均衡。
- Forking 同时调用多个相同的服务,只要其中一个返回,则立即返回结果。用户可以配置forks="最大并行调用数"参数来确定最大并行调用的服务数量。通常用在对接口实时性要求极高的调用上,但也会很浪费资源。
- Broadcast 广播调用所有可用的服务,任意一个节点报错则报错。由于是广播,所以不需要负载均衡。通常用于服务状态更新后的广播。
- Mock 提供调用失败时,返回伪造的响应结果。或直接强制返回伪造的结果,不会发起远程调用。
- Available 最简单的方式,请求不会做负载均衡,遍历所有服务列表,找到第一个可用的节点,直接请求并返回结果。如果没有可用的节点,则直接抛出异常。
- Mergeable 可以自动把多个节点请求得到的结果进行合并。
具体使用哪个实现,用户可以在<dubbo:server>、<dubbo:reference>、<dubbo:consumer>、<dubbo:provider>标签上通过cluster属性设置。
对于Failover模式,用户可以通过retries属性来设置最大的重试次数,可以设置在dubbo:reference标签上,也可以设置在细粒度的方法标签dubbo:method上。
对于Forking模式,用户可以通过forks="n"来设置最大并行数。如果可用的服务数大于n,则会并行请求n个服务;如果小于n,则会并行请求所有可用的服务。
对于Mergeable模式,用户可以在dubbo:reference标签中通过merger="true"开启,合并时通过gorup="*"属性指定需要合并哪些分组的结果。默认会根据方法的返回值自动匹配合并器,如果同一个类型有两个不同的合并器实现,则需要在参数中指定合并器的名字(merger="合并器名")。例如:用户根据List类型的返回结果发现了多个合并器,则需要手动指定合并器名,否则框架不知道哪个。如果想调用返回结果的指定方法进行合并(如返回了一个Set,想要调用Set#addAll方法),可以通过merger=".addAll"配置来实现。官方Mergeble配置示例如下:
<!-- 搜索所有分组,根据返回结果的类型自动查找合并器。该接口中getMenuItems方法不做合并 -->
<dubbo:reference interface-="com.xxx.MenuService" group="*" merger="true">
<dubbo:method name="getMenuItems" merger="false" />
</dubbo:reference>
<!-- 指定方法合并器名 -->
<dubbo:reference interface-="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger="mymerger" />
</dubbo:reference>
<!-- 调用返回结果的指定方法进行合并 -->
<dubbo:reference interface-="com.xxx.MenuService" group="*">
<dubbo:method name="getMenuItems" merger=".addAll" />
</dubbo:reference>
9.2.2 Cluster接口关系
在微服务环境中,可能多个节点同时都提供了同一个服务。当上层调用Invoker时,无论存在多少个Invoker,只需要通过Cluster层,即可完成整个调用的容错逻辑,包括获取服务列表、路由、负载均衡等,整个过程对上层是透明的。当然,Cluster接口只是串联起整个逻辑,其中ClusterInvoker只实现了容错策略部分,其他逻辑则是调用了Directory、Router、LoadBalance等接口实现。
容错的接口主要分为两大类,第一类是Cluster类,第二类是ClusterInvoker类。两者之间的关系很简单:Cluster接口下面有很多不同的实现,每种实现都需要实现join方法,在方法中会new一个对应的ClusterInvoker实现。如FailoverCluster的实现:
public class FailoverCluster implements Cluster {
...
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
我们先从“上帝视角”看一下整个集群容错的接口关系:
Cluster是最上层的接口,下面一共9个实现类。Cluster接口上有SPI注解,也就是说,实现类通过扩展机制动态生成。每个实现类都只有一个join方法,实现也很简单,直接new一个对应的ClusterInvoker,其中AvailableCluster例外,因为它直接使用匿名内部类实现了所有功能。
接下来我们看一下ClusterInvoker接口的类结构:
Cluster接口的类图关系
ClusterInvoker总体类结构
Invoker接口是最上层的接口,下面分别有AbstractClusterInvoker、MockClusterInvoker和MergeableClusterInvoker三个类。其中AbstractClusterInvoker封装了通用的模板逻辑,如获取服务列表、负载均衡、调用服务提供者等,并预留了一个doinvoke方法需要子类自行实现。AbstractClusterInvoker下面有7个子类,分别实现了不同的集群容错机制。
MockClusterInvoker和MergeableClusterInvoker由于并不适用于正常的容错逻辑,因此没有挂在AbstractCluster下面,而是直接继承了Invoker接口。
9.2.3 Failover策略
这是Cluster接口的默认实现策略,该策略的代码逻辑如下:
(1) 校验。校验从AbstractClusterInvoker传入的Invoker列表是否为空。
(2) 获取配置参数,从调用URL中获取对应的retries重试次数。
(3) 初始化一些集合和对象。用于保存调用过程中出现的异常、记录调用了哪些节点(这个会在负载均衡中使用,在某些配置下,尽量不要一直调用同一个服务)。
(4) 使用for循环实现重试,for循环的次数就是重试的次数。成功则返回,否则继续循环。如果for循环完,还没有一个成功的返回,则抛出异常,把(3)中记录的信息抛出去。
具体for循环中的逻辑如下:
- 校验。如果for循环次数大于1,即有过失败,则会再次校验节点是否被销毁、传入的Invoker列表是否为空。
- 负载均衡。调用select方法做负载均衡,得到要调用的节点,并记录这个节点步骤(3)的集合里,再把已经调用的节点信息放进RPC上下文中。
-
远程调用。调用invoker#invoke方法做远程调用,成功则返回,异常则记录异常信息,再做下一次循环。
流程图如下:
Failover流程
9.2.4 Failfast策略
Failfast会在失败后直接抛出异常并返回,实现非常简单,步骤如下:
(1) 校验。校验从AbstractClusterInvoker传入的Invoker列表是否为空。
(2) 负载均衡。调用select方法做负载均衡,得到要调用的节点。
(3) 进行远程调用。在try代码块中调用invoker#invoke方法做远程调用,如果捕获到异常,则直接封装成RpcException抛出。
整个过程非常简单,也不会做任何中间信息的记录。
9.2.5 Failsafe策略
Failsafe调用是如果出现异常,则会直接忽略。实现也很简单,步骤如下:
(1) 校验。校验从AbstractClusterInvoker传入的Invoker列表是否为空。
(2) 负载均衡。调用select方法做负载均衡,得到要调用的节点。
(3) 进行远程调用。在try代码块中调用invoker#invoke方法做远程调用,在"catch"中如果捕获到如何异常都直接“吞掉”,返回一个空的结果集。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance laodbalance) throws RpcException {
try {
// 校验
checkInvokers(invokers, invocation);
// 负载均衡
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// 远程调用,调用成功则直接返回
return invoker.invoke(invocation);
} catch () {
...
// 返回空的结果集
return new RpcResult();
}
}
9.2.6 Failback策略
Failback如果调用失败,则会定期重试。FailbackClusterInvoker里面定义了一个ConcurrentHashMap,专门用来保存失败的调用。另外定义了一个定时线程池,默认每5秒把所有失败的调用拿出来,重试一次。如果调用重试成功,则会从ConcurrentHashMap中移除。doInvoker的调用逻辑如下:
(1) 校验。校验从AbstractClusterInvoker传入的Invoker列表是否为空。
(2) 负载均衡。调用select方法做负载均衡,得到要调用的节点。
(3) 进行远程调用。在try代码块中调用invoker#invoke方法做远程调用,在"catch"中如果捕获到如何异常,会直接把invocation保存到重试的ConcurrentHashMap中,返回一个空的结果集。
(4) 定时线程池会定时把ConcurrentHashMap中的失败请求拿出来重新请求,请求成功则从ConcurrentHashMap中移除。如果请求还是失败,则异常也会被"catch"住,不会影响ConcurrentHashMap中后面的重试。
void retryFailed() {
if (failed.size() ==) {
// 没有失败请求,直接推出
return;
}
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
// 重新进行请求,成功则从失败记录中移除
invoker.invoke(invocation);
failed.remove(invacation);
} catch (Throwable e) {
// 捕获异常,只打印日志,防止异常中断重试过程
...
}
}
}
9.2.7 Available策略
Available是找到第一个可用的服务直接调用,并返回结果。步骤如下:
(1) 遍历从AbstractClusterInvoker传入的Invoker列表,如果Invoker是可用的,则直接调用并返回。
(2) 如果遍历整个列表还没有找到可用的Invoker,则抛出异常。
9.2.8 Broadcast策略
Broadcast会广播给所有的节点,如果任何一个节点报错,则返回异常。步骤如下:
(1) 前置操作。校验从AbstractClusterInvoker传入的Invoker列表是否为空;在RPC上下文中设置Invoker列表;初始化一些对象,用于保存调用过程中产生的异常和结果信息等。
(2) 循环遍历所有Invoker,直接做RPC调用。任何一个节点调用出错,则只有最后一个节点的异常会被抛出,前面的异常会被覆盖。
Forking策略
Forking可以同时并行请求多个服务,有任何一个返回,则直接返回。相对于其他调用策略,Forking的实现是最复杂的。其步骤如下:
(1) 准备工作。校验传入的Invoker列表是否可用;初始化一个Invoker集合,用于保存真正要调用的Invoker列表;从URL中得到最大并行数、超时事件。
(2) 获取最终要调用的Invoker列表。假设用户设置最大的并行数为n,实际可以调用的最大服务数为v。如果n>v,则说明可用的服务数小于用户的设置,因此最终要调用的Invoker只能有v个;如果n<v,则会循环调用负载均衡方法,不断得到可调用的Invoker,加入步骤(1)中的Inovoker集合里。
这里有一点要注意:在Invoker加入集合是,会做去重操作。因此,如果用户设置的负载均衡策略每次返回的都是同一个Invoker,那么集合中最后只会存在一个Invoker,也就是只会调用一个节点。
(3) 调用前的准备工作。设置要调用的Invoker列表到RPC上下文;初始化一个异常计数器;初始化一个阻塞队列,用于记录并行调用的结果。
(4) 执行调用。循环使用线程池并行调用,调用成功,则把结果加入阻塞队列;调用失败,则失败计数+1。如果所有线程的调用都失败了,即失败计数大于等于所有可调用的Invoker数量时,则把异常信息加入阻塞队列。
这里需要注意:并行调用是如何保证个别调用失败不返回异常信息,只有全部失败才返回异常信息呢?因为有判断条件,当失败计数大于等于所有可调用的Invoker时,才会把异常信息放入阻塞队列,所以只有当最后一个Invoker也调用失败时才会把异常信息保存到阻塞队列,从而达到全部失败才返回异常样的结果。
(5) 同步等待结果。由于步骤4是在线程池中执行的,因此主线程还会继续往下执行,主线程中会使用阻塞队列的poll("超时时间")方法,同步等待阻塞队列中的第一个结果,如果是正常结果则返回,如果是异常则输出。
从上面的步骤得知,Forking的超时是通过在阻塞队列的poll方法中传入超时时间实现的;线程池中的并发调用会获得第一个正常返回结果。只有所有请求都失败了,Forking才会失败。流程图如下:
Forking调用流程
9.3 Directory的实现
整个容错过程都会首先使用Directory#list方法来获取所有的Invoker列表。Directory也有很多实现类,既可以提供静态的Invoker列表,也可以提供动态的Invoker列表。静态是用户自己设置的Invoker列表;动态是根据注册中心的数据动态变化,动态更新Invoker列表的数据,整个过程对上层透明。
9.3.1 总体实现
我们先从“上帝视角”看一下整个Directory体系:
Directory类结构
Directory是顶层的接口,AbstractDirectory封装了通用的实现逻辑。抽象类包含RegistryDirectory和StaticDirectory两个子类。分别极少他们的职责:
(1) AbstractDirectory,封装了通用逻辑,主要实现了4个方法:检测Invoker是否可用,销毁所有Invoker,list方法,还留了一个抽象的doList方法给子类自行实现。list方法是最主要的方法,用于返回所有可用的list,逻辑分两步:
- 调用抽象方法doList获取所有Invoker列表,不同子类有不同实现。
- 遍历所有router,进行Invoker的过滤,最后返回过滤好的Invoker列表。
(2) RegistryDirectory,属于动态列表实现,会自动从注册中心更新Invoker列表、配置信息、路由列表。
(3) StaticDirectroy,属于静态列表实现,即将传入的Invoker列表封装成静态的Directory对象,里面的列表不会改变。
9.3.2 RegistryDirectory的实现
RegistryDirectory有两个比较重要的逻辑线,一个是框架与注册中心的订阅并动态更新本地Invoker列表、路由列表、配置信息等;另一个是子类实现doList方法:
- 订阅和动态更新的逻辑。
主要涉及subscribe、notify、refreshInvoker三个方法,其余的都是一些辅助方法,如toConfigurators、toRouters。
subscribe是订阅某个url的更新信息。Dubbo在引用每个需要RPC调用的Bean的时候,会调用directory.subscribe来订阅这个Bean的各种url的变化。里面调用的是registry.subscribe,这些代码在第三章已经讲过。
notify就是监听配置中心对应的url变化,然后更新本地的配置参数。监听的url分为:配置configurators、路由规则router、Invoker列表。工作流程如下:
(1) 新建三个List,分别用于保存更新的Invoker URL、路由配置URL、配置URL。遍历监听返回的所有URL,分类放入三个List。
(2) 解析并更新配置参数。
- 对于router参数,首先遍历所有router类型的URL,然后通过Router工厂把每个URL包装成路由规则,最后更新本地的路由信息。这个过程会忽略以empty开头的URL。
- 对于Configurator类参数,管理员可以在dubbo-admin动态配置功能上修改生产者参数,这些参数会保存在配置中心的configurators类目录下。notify监听到URL配置参数变化,会解析并更新本地的configurator配置。
- 对于Invoker类型的参数,如果是empty协议的URL,则会禁用该服务,并销毁本地缓存的Invoker;如果监听到的Invoker类型URL都是空的,则说明没有更新,直接用本地的老缓存;如果监听到的Invoker类型不为空,则把新的URL和本地老的URL合并,创建新的Invoker,找出差异的老Invoker并销毁。
Dubbo-admin上更新路由规则或参数是通过"override://"协议实现的,这个协议的URL会覆盖更新本地URL中对应的参数。如果使用的是"empty://"协议的URL,则会清空本地的配置。具体Dubbo-admin的使用参考官网。
- doList的实现
notify中更新的Invoker列表最终会转换成一个字典Map<String, List<Invoker<T>>> methodInvokerMap。key是对应的方法名称,value是整个Invoker列表。doList的最终目的是在字典中匹配出可以调用的Invoker列表,并返回给上层。主要步骤如下:
(1) 检测服务是否被禁用。如果配置中心禁用了该服务,则无法被调用,抛出异常。
(2) 根据方法名和首参数匹配Invoker。这是个比较奇特的特性。根据方法名和首参数查找对应的Invoker方法,暂时没有看到应用场景。
(3) 根据方法名匹配Invoker。以方法名为key去methodInvokerMap中匹配Invoker列表,如果没有匹配到,进入第(4)步
(4) 根据"*"匹配Invoker。用星号去匹配Invoker列表,如果没有匹配到,则进入最后异步兜底操作。
(5) 遍历methodInvokerMap,找到一个Invoker列表返回。如果还没有,则返回一个空列表。
9.4 路由的实现
Directory获取所有Invoker列表的时候,会调用路由接口,路由接口会根据用户配置的不同路由策略对Invoker列表进行过滤,只返回符合规则的Invoker,例如:如果用户配置了接口A的所有调用,都是用IP为192.168.1.2的节点,则路由会过滤其他的Invoker,只返回IP为192.168.1.2的Invoker。
9.4.1 路由的总体结构
路由分为条件路由、文件路由、脚本路由,对应dubbo-admin中三种不同的规则配置方式。条件路由是用户使用Dubbo定义的语法规则去写路由规则;脚本路由则是通过JDK自身的脚本引擎解析路由规则脚本,所有JDK脚本引擎支持的脚本都能解析,默认是JavaScript。我们先来看接口之间的关系:
Router接口关系
RouterFactory是一个SPI接口,没有设置默认值,但由于有@Adaptive("protocol")注解,因此他会根据URL中的protocol参数确定要初始化那一具体的Router实现。
RouterFactory的实现非常简单,就是直接"new"一个对应的Router并返回,如CondtionRouterFactory直接"new"并返回一个ConditionRouter。
9.4.2 条件路由的参数规则
条件路由使用的是condition://xieyi ,URL形式是“condition://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule=”+URL.encode("host=10.20.153.10=>host=10.20.153.11"),其具体含义是:
- condition:// 表示路由规则的类型,支持条件规则和脚本规则,可扩展,必填
- 0.0.0.0 表示对所有IP地址生效,如果只想对某个IP生效,则填入具体的IP,必填
- com.foo.BarService 表示只对指定服务生效,必填
- category=routers 表示该数据为动态配置类型,必填
- dynamic=false 表示该数据为持久数据,当注册方退出时,数据依然保存在注册中心,必填
- enabled=true 覆盖规则是否生效,非必填,默认生效
- force=true 是否强制执行路由过滤。设置为true表示强制执行路由,即当路由结果为空时,放弃执行路由,直接返回所有传进来的Invoker列表;设置为false表示不强制执行路由,即当路由结果为空时,如果强制,非必填,默认false
- runtime=false 是否在每次调用时执行路由规则,否则只在提供者地址列表变更时预先执行并缓存结果,调用时直接从缓存中获取路由结果。如果用了参数路由,则必须设置为true,需要注意设置会影响调用的性能,非必填,默认false
- priority=1 路由规则的优先级,用于排序,优先级越大越靠前执行,非必填,默认0
- rule=URL.encode("host=10.20.153.10=>host=10.20.153.11") 表示路由规则的内容,必填
下面我们来看一条路由规则的示例:
mehtod = find* => host = 192.168.1.22
- 这条配置说明所有调用find开头的方法都会被路由到IP为192.168.1.22的服务节点上。
- => 之前的部分为消费者匹配条件,将所有参数和消费者的URL进行比对,当消费者满足匹配条件时,对该消费者执行后面的过滤规则。
- => 之后的部分为提供者地址列表的过滤条件,将所有参数和提供者的URL进行比对,消费者最终只获取过滤后的地址列表。
- 如果匹配条件为空,则表示应用于所有消费方,如=> host != 192.168.1.22。
- 如果过滤条件为空,则表示禁止访问,如host = 192.168.1.22 =>。
整个规则的表达式支持$protocol等占位符方式,也支持=、!=等条件。值也可以支持多个,哟过逗号分割,如host = 192.168.1.22,192.168.1.23。
9.4.3 条件路由的实现
条件路由的具体实现类是ConditionRouter,Dubbo会根据自定义的规则语法实现路由规则。我们主要关注构造方法和route方法。
- 构造方法逻辑:
(1) 根据URL的键rule获取对应的规则字符串。以=>为界,把规则分两段,前面的为whenRule,即消费者匹配条件;后面的为thenRule,为提供者地址列表的过滤条件。比如之前的例子,就会解析成whenRule: method = find和thenRule: host = 192.168.1.22。
(2) 分别解析两个Rule.调用parseRule方法,通过正则表达式不断循环匹配whenRule和thenRule字符串。解析的时候,会根据key-value之间的分隔符对key-value做分类(如果A=B,则分隔符为=),支持分隔符形式有A=B、A&B、A!=B、A,B这4中形式。最终参数会被封装成一个个MatchPair对象,放入Map中保存。Map的key是参数值,value是MatchPair对象。以之前的例子为例,会生成method为key的when Map,以host为key的then Map。value则分别包装了find和192.168.1.22的MatchPair对象。
MatchPair的作用有两个:一个是通配符的匹配和占位符的赋值。MatchPair对象是内部类,里面只有一个isMatch方法,用于判断值是否能匹配上规则。规则里的支持protocol、usernaem、password、host、port、path这几个动态参数的占位符;另一个作用是缓存规则。MatchPair对象中有两个Set集合,一个用于保存匹配的规则,如=find;另一个用于保存不匹配的规则,如!=find。这两个集合子啊后续路由规则匹配的时候使用到。 - route方法的实现原理
ConditionRoute实现了Router接口的route方法,该方法的主要功能是过滤出符合路由规则的Invoker列表,即做具体的条件匹配判断,其步骤如下:
(1) 校验。如果规则没有启用,则直接返回;如果传入的Invoker列表为空,则直接返回;如果没有任何的whenRule匹配,则不需要过滤,直接返回传入的Invoker列表;如果whenRule有匹配的,但是thenRule为空,即没有匹配上规则的Invoker,则返回空。
(2) 遍历Invoker列表,通过thenRule找出所有符合规则的Invoker加入集合。
(3) 返回结果。如果结果集不为空,则直接返回;如果结果集为空,但是规则配置了force=true,即强制过滤,那么就会返回空结果集;非强制则不过滤,过滤结果即为空的时候就不过滤,返回所有Invoker列表。
9.4.4 文件路由的实现
文件路由是把规则写在文件中,文件中写的是自定义的脚本规则,可以是JavaScript,Groovy等,URL中对应的key值填写的是文件的路径。文件路由主要做就是把文件中的路由脚本读出来,然后调用路由的工厂去匹配对应的脚本路由做解析。
// 把类型为file的protocol替换为scritp类型
String protocol = url.getParameter(Constants.ROUTER_KEY, ScriptRouterFactory.NAME);
String type = null;
String path = url.getLength();
// 解析文件后缀名,用于匹配到底是什么脚本,如JS、Groovy等
if (path != null) {
int i = path.lastIndexOf('.');
if (i > 0) {
type = path.substring(i+1);
}
}
// 读取文件
String rule = IOUtils.read(new FileReader(new File(url.getAbsolutePath())));
// 读取是否运行时的参数
boolean runtime = url.getParameter(Constants.RUNTIME_KEY, false);
// 生成路由工厂可以识别的URL,并把参数添加进去
URL script = url.setProtocol(protocol)
.addParameter(Constants.TYPE_KEY, type);
.addParameter(Constants.RUNTIME_KEY, runtime);
.addParameterAndEncoded(Constants.RULE_KEY, rule);
// 再次调用路由的工厂,由于前面配置了protocol为script类型,这里会使用脚本路由进行解析
return routerFactory.getRouter(script);
9.4.5 脚本路由的实现
脚本路由使用JDK自带的脚本解析器解析脚本并运行,默认使用JavaScript解析器,其逻辑分为构造方法和route方法两部分。构造方法主要负责一些初始化的工作,route方法则是具体的过滤逻辑执行的地方。我们来看一段官网提供的JS脚本:
function route(invokers) {
// 创建一个List
var result = new java.util.ArrayList(invokers.size());
for (i = 0; i < invokers.size(); i++) {
if ("10.20.153.10".equals(invokers.get(i).getUrl().getHost())) {
// 遍历传入的所有Invokers,过滤所有IP不是10.20.153.10的Invoker
result.add(invokers.get(i));
}
}
return result;
} (invokers); // 表示立即执行方法
我们在写JS脚本时,需要注意一个服务只能有一条规则,如果有多条规则,并且规则之间没有交集,则会把所有的Invoker都过滤。另外,脚本路由中也没看到沙箱约束,有注入的安全风险。
脚本路由的构造方法逻辑:
(1) 初始化参数。获取规则的脚本类型、路由优先级。如果没有设置脚本类型,则默认设置为JavaScript类型,如果没有解析到任何规则,则抛出异常。
(2) 初始化脚本执行引擎。根据脚本类型,通过Java的ScriptEngineManager创建不同的脚本执行器,并缓存起来。
route方法的逻辑就是调用脚本引擎,获取执行结果并返回。
List<Invoker<T>> invokersCopy = new ArrayList<Invoker<T>>(invokers);
Compilable compilable = (Compilable) engine;
// 构造要传入脚本的参数
Bindings bindings = engine.createBindings();
bingdings.put("invokers", invokersCopy);
bingdings.put("invocation", invocation);
bingdings.put("context", RpcContext.getContext());
CompiledScript function = compilalbe.compile(rule);
// 执行脚本
Object obj = function.eval(bindings);
负载均衡的实现
9.5.1 包装后的负载均衡
所有的容错策略中的负载均衡都使用了抽象父类AbstractClusterInvoker中定义的Invoker<T> select()方法,而不是直接使用LoadBalance的方法。因为抽象父类在LoadBalance的基础上又封装了一些新的特性:
(1) 粘滞连接。粘滞连接用于有状态服务,尽可能让客户端总是向同一提供者发起调用,除非该提供者“挂了”,才会连接另一台。
(2) 可用检测。Dubbo调用的URL中,如果含有culster.availablecheck=false,则不会检测远程服务是否可用,直接调用。如果不设置,则默认会开启检查,对所有的服务都做是否可用的检查,如果不可用,则再次做负载均衡。
(3) 避免重复调用。对于已经调用过的远程服务,避免重复选择导致每次都使用同一个节点。这种特性主要是为了避免高并发场景,某个节点瞬间被大量请求。
负载均衡整个过程大致分4步:
(1) 检查URL是否有配置粘滞连接,如果有则使用粘滞连接的Invoker。如果没有,或者重复调用检测、可用性检测不通过,进入第2步。
(2) 通过ExceptionLoader获取负载均衡的具体实现,并通过负载均衡做节点的选择。对选择出的节点做重复调用、可用性检测,通过则直接返回,不通过则进入第3步。
(3) 进行节点的重新选择。如果需要做可用性检测,则会遍历Directory中得到的所有节点,过滤不可用和已经调用的节点,在剩余的节点中重新做负载均衡;如果不需要做可用性检测,那么也会遍历Directory中得到的所有节点,但只过滤已经调用过的,在剩余的节点中重新做负载均衡。这里存在一种情况,就是在过滤不可用或已经调用过的节点时,节点全部被过滤,没有剩下任何节点,此时进入第4步
(4) 遍历所有已经调用过的节点,选出所有可用的节点,再通过负载均衡选出一个节点并返回。如果还找不到可调用的节点,则返回null。
9.5.2 负载均衡的总体结构
Dubbo内置了4种负载均衡算法,用户也可以自行扩展,因为LoadBalance接口上有@SPI注解。
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
从代码来看,默认的负载均衡实现是RandomLoadBalance,即随机负载均衡。由于select方法上有@Adaptive("loadbalance")注解,因此我们在URL中可以通过loadbalance=xxx来动态指定select时的负载均衡算法。下面我们对Dubbo提供的所有负载均衡算法进行说明:
- Random 随机,按权重设置随机概率。在一个节点上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者的权重。
- RoundRobin 轮询,按公约后的权重设置轮询比例。存在满的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那里,久而久之,所有请求都卡在调用第二台上。
- LeastActive 最少活跃调用树,如果活跃数相同则随机调用,活跃数指调用前后计数差。使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
- ConsistentHash 一致性哈希,相同参数的请求总是发到同一提供者。当某一天提供者挂了,原本发往提供者的请求,基于虚拟节点,会平摊到其他提供者,不会引起剧烈变动。默认只对第一个参数哈希,如果要修改,则配置<dubbo:parameter key="hash.arguments" value="0,1" />。默认使用160份虚拟节点,如果要修改,则配置<dubbo:parameter key="hash.nodes" value="320" />
4种负载均衡算法都继承自同一个抽象类,抽象父类已经把通用的逻辑完成,留了一个抽象的doSelect方法给子类实现。负载均衡的接口关系图:
负载均衡接口关系
抽象父类AbstractLoadBalance有两个权重相关的方法:calculateWarmupWeight和getWeight。getWeight方法就是获取当前Invoker的权重,calculateWarmupWeight是计算具体的权重。getWeight方法会调用calculateWarmupWeight:
// 通过URL获取当前Invoker设置的权重
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
// 获得启动的时间点
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 求差值,得到已经预热了多久
int uptime = (int) (System.currentTimeMillis() - timestamp);
// 获取设置的总预热时间
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
// 计算出最后的权重
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
calculateWarmupWeight的计算逻辑比较简单,由于框架考虑了服务刚启动的时候需要有一个预热的过程,如果一启动就给予100%的流量,则可能会让服务崩溃,因此实现了这个方法用于计算预热时候的权重,计算逻辑是:(启动至今时间/给予的预热总时间)*权重。
9.5.3 Random负载均衡
Random负载均衡是按权重设置随机概率做负载均衡的。这种算法不能精确地平均请求,但随着请求数量的增加,最终结果是大致平均的。步骤如下:
(1) 计算权重并判断每个Invoker的权重是否一样。遍历整个Invoker列表,求和总权重。在遍历过程中,会对比每个Invoker的权重,判断所有Invoker的权重是否相同。
(2) 如果权重相同,则说明每个Invoker的概率都一样,因此直接使用nextInt随机选一个Invoker返回即可。
(3) 如果权重不同,则首先得到偏移值,然后根据偏移值找到对应的Invoker。
// 根据总权重计算出一个随机的偏移量,此处使用了ThreadLocalRandom性能会更好
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// 遍历所有的Invoker,累减,得到被选中的Invoker
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
9.5.4 RoundRobin负载均衡
权重轮询负载均衡会根据设置的权重来判断轮询的比例。普通轮询负载均衡的好处是每个节点获得的请求会很均匀,如果某些节点的负载能力明显较弱,则这个节点会堆积比较多的请求。因此普通轮询不能满足需求,还需要能根据节点权重进行干预。权重轮询又分为普通权重轮询和平滑权重轮询。普通权重轮询会造成某个节点会突然被频繁选中,这样很容突然让一个节点流量暴增。Nginx中有一种叫平滑轮询的算法,在轮询时会穿插选择其他节点,让整个服务选择的过程比较均匀,不会逮住一个节点一直调用。Dubbo框架中最新的RoundRobin代码已经改为平滑权重轮询算法。
我们先看Dubbo的RoundRobin负载均衡的工作步骤:
(1) 初始化权重缓存Map。以每个Invoker的URL为key,对象WeightedRoundRobin为value生成一个ConcurrentMap,并把这个Map保存到全局的methodWeightMap中:ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap。methodWeightMap的key是每个接口+方法名。这一步只会生成这个缓存Map,但里面是空的,第2步才会生成每个Invoker对应的键值。
WeightedRoundRobin封装了每个Invoker的权重,对象中保存了三个属性:
// Invoker设定的权重
private int weight;
// 考虑到并发场景下某个Invoker会被同时选中,表示该节点被所有线程选中的权重总和。
// 例如:某节点权重为100,被4个线程同时选中,则变为400
private AtomicLong current = new AtomicLong(0);
// 最后一个更新的时间,用于后续缓存超时的判断
private long lastUpdate;
(2) 遍历所有Invoker。首先,在遍历过程中把每个Invoker的数据填充到第1步生成的权重缓存map中。其次,获取每个Invoker的预热权重,新版的框架RoundRobin也支持预热,通过和Random负载均衡中相同的方式获得预热阶段的权重。如果预热权重和Invoker设置的权重不想等,则说明还在预热阶段,此时会以预热权重为准。然后,进行平滑轮询。每个Invoker会把权重加到自己的current属性上,并更新当前Invoker的lastUpdate。同时累加每个Invoker的权重到totalWeight。最终,遍历完后,选出所有Invoker中current最大的作为最终要调用的节点。
(3) 清除不再使用的缓存节点。由于所有的Invoker的权重都会被封装成一个weightedRoundRobin对象,因此如果可调用的Invoker列表和缓存weightedRoundRobin对象的Map大小不相等,说明缓存Map中有无用数据(有些Invoker已经不存在,但Map中还有缓存)
为什么不相等就说明有老数据?如果Invoker列表比缓存Map大,则说明有没被缓存的Invoker,此时缓存Map会新增数据。因此缓存Map永远大于等于Invoker列表。
清除老旧数据时,各线程会先用CAS抢占锁,然后复制原有的Map到一个新的Map中,根据lastUpdate清除新Map中的过期数据,最后把Map从旧的Map引用修改到新的Map上面。这是一种CopyOnWrite的修改方式。
(4) 返回Invoker。注意返回之前会把当前Invoker的current减去总权重。这是平滑权重轮询中重要的一步。
看了这个逻辑,估计很多人还是没明白整个轮询过程,因为穿插了框架逻辑。因此这里把算法逻辑提取出来:
(1) 每次请求做负载均衡时,会遍历所有可调用的节点(Invoker列表)。对于每个Invoker,让它的current = current + weight。属性含义见weightedRoundRobin对象。同时累加每个Invoker的weight到totalWeight,即totalWeight = totalWeight + weight。
(2) 遍历完所有Invoker后,current值最大的节点就是本次要选择的节点。最后,把该节点的current值减去totalWeight,即current = current - totalWeight。
假设有3各Invoker:A、B、C,它们的权重分别是1、6、9,初始current都是0,则平滑权重轮询过程如下:
请求次数 | 被选中前Invoker的current | 被选中后Invoker的current | 被选中的节点 |
---|---|---|---|
1 | {1, 6, 9} | {1, 6, -7} | C |
2 | {2, 12, 2} | 2, -4, 2} | B |
3 | {3, 2, 11} | {3, 2, -5} | C |
4 | {4, 8, 4} | {4, -8, 4} | B |
5 | {5, -2, 13} | {5, -2, -3} | C |
6 | {6, 4, 6} | {-10, 4, 6} | A |
7 | {-9, 10, 15} | {-9, 10, -1} | C |
从上表可以看出,A、B、C被选中的次数比例接近1:6:9,此外,C被频繁地一直调用,其中会穿插B和A的调用。
9.5.5 LeastActive负载均衡
LeastActive负载均衡称为最少活跃调用数负载均衡,即框架会记下每个Invoker的活跃数,每次只从活跃数最少的Invoker里选一个节点。这个算法需要配合ActiveLimitFilter过滤器来计算每个接口方法的活跃数。最少活跃数负载均衡可以看作Random负载均衡的“加强版”,因为最后根据权重做负载均衡的时候,使用的算法和Random的一样。
// 初始化各种计数器,如最小活跃数计数器,总权重计数器等
...
for (int i = 0; i < length; i++) {
// 获得Invoker的活跃数、预热权重
...
// 第一次,或者发现有更小的活跃数
if (leastActive == -1 || active < leastActive) {
// 不管是第一次还是有更小的活跃数,之前的计数都要重新开始,这里置空之前的计数。
// 因为只计数最小的活跃数
...
} else if (active == leastActive) {
// 当前Invoker活跃数与计数相同说明有N各Invoker都是最小计数,全部保存到集合中,
// 后续就在它们里面根据权重选一个节点
}
}
// 如果只有一个Invoker则直接返回
...
// 如果权重不一样,则使用和Random负载均衡一样的权重算法找到一个Invoker并返回
...
// 如果权重相同,则直接随机选一个返回
...
从上面的代码可以得知其逻辑:遍历所有Invoker,不断寻找最小活跃数(leastActive),如果有很多Invoker的活跃数都等于leastActive,则把它们保存到一个集合中,最后在这个集合中通过随机的方式选出一个Invoker。
那最少活跃数的计数又是如何知道呢?
在ActiveLimitFilter中,只要进来一个请求,该方法的调用的计数就会原子性+1。整个Invoker调用过程会包在try-catch-finally中,无论调用结束或出现异常,finally中都会把计数原子性-1.该原子计数就是最少活跃数。(所以filter应该是每次调用完都会把计数放在响应中,客户端去做缓存吗?)
在服务运行一段时间后,性能好的服务提供者处理请求的速度更快,因此活跃数下降的也越快,此时这样的服务提供者能够优先获取到新的服务请求、这就是最小活跃数负载均衡算法的基本思想。
9.5.6 一致性哈希负载均衡
一致性哈希负载均衡可以让参数相同的请求都路由到相同的机器上。(注意其他的负载均衡算法要实现这点需要手动配置粘滞连接)这种负载均衡的方式可以让请求相对平均,相比直接使用哈希,当某些节点下线时,请求会平摊到其他服务提供者,不会引起剧烈变动。我们先来看下普通一致性哈希:
普通一致性哈希
普通一致性哈希会把每个服务节点散列到环形上,然后把请求的客户端散列在环上,顺时针往前找到的第一个节点就是要调用的节点。假设客户端落在区域2,则调用的是服务C。当服务C宕机时,落在区域2的客户端会自动迁移到服务D上。这样就避免了全部重新散列的问题。
但也有它的均线性,它的散列不一定均匀,容易造成某个节点压力过大。因此Dubbo使用的是优化过的Ketama一致性哈希。这种算法会为每个真实节点在创建多个虚拟节点,让节点在环形上的分布更加均匀,后续的调用也会随之更加均匀。
// 获得方法名
String emthdoName = RpcUtils.getMethodName(invocation);
// 用接口名+方法名拼接key
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// 把所有可以调用的Invoker列表进行哈希
int identityHashCode = System.identityHashCode(invokers);
// 缓存中获取selector
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 乳沟现在Invoker列表的Hash码和之前的不一样,说明Invoker列表发生了变化,则重新创建Selector
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelctor<T>(invokers, methodName, identityHashCode));
selector = ( ConsistentHashSelctor<T>) selectors.get(key);
}
// 通过selector选出一个Invoker
return selector.select(invocation);
整个逻辑的核心在ConsistentHashSelctor中,因此我们继续看它是如何初始化的。ConsistentHashSelctor初始化时会对节点节点进行散列,散列的环形是使用一个TreeMap实现的,所有的真实、虚拟节点都放入TreeMap。把节点的IP+递增数字做"MD5",以此作为节点标识,在对标识做“Hash”得到TreeMap的key,最后把可以调用的节点作为TreeMap的value。
// 遍历所有节点
for (Invoker<T> invoker : invokers) {
// 得到每个节点的IP
String address = invoker.getUrl().getAddress();
// replicaNumber是生成的虚拟节点数,默认160
for (int i = 0; i < replicaNumber / 4; i++) {
// 以IP+递增数字做MD5,以此作为节点标识
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
// 对标识做hash,得到TreeMap的key,以Invoker为value
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
TreeMap实现一致性哈希:在客户端调用时,只要对请求的参数也做“MD5”即可。虽然此时得到的MD5值不一定对应到TreeMap中的一个key,因为每次的请求参数不同。但是由于TreeMap是有序的树型结构,所以可以调用TreeMap的ceilingEntry方法,用于返回一个至少大于或等于当前给定key的Entry,从而打到顺时针往前找的效果。如果找不到,则使用firstEntry返回第一个节点。
要理解清楚这个实现必须得先了解TreeMap,它跟传统的HashMap实现完全不是一回事,TreeMap使用红黑树来存储,用key来进行比较排序,get方法是通过key在红黑树中进行二分查找进而找到目标元素,put方法则是在红黑树中插入元素。所以上面这个一致性哈希的实现就是对每个可用的invoker(服务端节点)进行默认160次的哈希,在TreeMap放置了160个key都指向同一个invoker(所以key就是所谓的虚拟节点,value是真实节点)。在客户端调用时,通过对参数MD5再哈希作为key在TreeMap中查找大于等于这个key的元素,它的value就是要调用的invoker。
9.6 Merger的实现
当一个接口有多种实现,消费者有需要同时引用不同的实现时,可以用group来区分不同的实现。
如果我们需要并行调用不同group的服务,并且要把结果合并起来,就需要用到Merger特性。Merger实现了多个调用后结果合并的逻辑。虽然业务曾也可以自行实现这个能力,但Dubbo直接封装到框架中,作为一种扩展点能力,简化了业务开发的复杂度。
9.6.1 总体结构
MergerCluster也是Cluster接口的一种实现,因此遵循Cluster的设计模式,在invoker方法中完成具体逻辑。整个过程会使用Merger接口的具体实现来合并结果集。在使用的时候,通过MergedFactory获得各种具体的Merger实现。
Merger的12种默认实现
如果开启了Merger特性,并且未指定合并器,框架会根据接口的返回类型自动匹配合并器。我们还可以扩展自己的合并器,MergerFactory在加载具体实现时,会用ExtensionLoader把所有SPI的实现都加载到缓存。合并器的实现较为简单,只列举MapMerger的实现:
@Override
public Map<?, ?> merge(Map<?, ?>... items) {
// 如果结果集为空,则直接返回null
if (items.length == 0) {
return null;
}
// 如果结果集不为空则新建一个Map,遍历返回的结果集并放入新的Map
Map<Object, Object> result = new HashMap<>();
for (Map<?, ?> item : items) {
if (item != null) {
result.putAll(item);
}
}
}
9.6.2 MergebaleClusterInvoker机制
MergebaleClusterInvoker串起了整个合并器逻辑。整个调用过程:MergeableCluster#join方法直接生成并返回了MergeableClusterInvoker,MergebaleClusterInvoker#invoke方法又通过MergerFactory工厂获取不同的Merger接口实现,完成了合并的具体逻辑。
MergeableCluster并没有继承抽象的Cluster实现,而是独立完成了自己的逻辑。因此整个逻辑和之前的Failover等机制不同,步骤如下:
(1) 前置准备。通过directory获取所有Invoker列表
(2) 合并器检查。判断某个方法是否有合并器,如果没有,则不会并行调用多个group,找到第一个可以调用的Invoker直接调用就返回了。如果又合并器,则进入第3步。
(3) 获取接口的返回类型。通过反射获得返回类型,后续要根据这个返回值查找不同的合并器。
(4) 并行调用。把Invoker的调用封装成一个个Callable对象,放到线程池中执行,保存线程池返回的future对象到HashMap中,用于等待后续结果返回。
(5) 等待Future对象的返回结果。获取配置的超时参数,遍历(4)中得到的future对象,设置Future#get的超时时间,同步等待得到并行调用结果。异常的结果会被忽略,正常结果被保存到list中。如果最终没有返回结果,则直接返回一个空的RpcResult;如果只有一个结果,那么也直接返回,不需要再做合并;如果返回类型是void,则说明没有返回值,也直接返回。
(6) 合并结果集。如果配置的是merger = ".addAll",则直接通过反射调用返回类型中的.addAll方法合并结果集。
9.7 Mock
在Cluster中,还有最后一个MockClusterWrapper,有它实现了Dubbo的本地伪装。这个使用场景较多,通常会应用在一下场景中:服务降级;部分非关键服务全部不可用,希望主流程继续进行;在下游某些节点调用异常时,可以以Mock的结果返回。
Mock场景的使用方式
Mock只有在拦截到RpcException的时候会启用,属于异常容错方式的一种。业务层面其实也可以用try-catch来实现功能,如果使用下沉到框架中的Mock机制,则可以让业务的实现更优雅。常见配置有如下三种:
<dubbo:reference interace="com.foo.BarService" mock="true" />
<dubbo:reference interace="com.foo.BarService" mock="com.foo.BarServiceMock" />
<dubbo:reference interace="com.foo.BarService" mock="return null" />
如果mock设置了true或者default,则实现的类名必须是接口名+Mock。
最后一种return null的配置方式通常在想要直接忽略异常的时候使用。
服务的降级是在dubbo-admin中通过override协议更新Invoker的mock参数实现的。如果mock参数设置为mock=force:return+null,则表明是强制Mock,强制Mock会让消费者对该服务的调用直接返回null,不再泛起远程调用。通常使用在非重要服务已经不可用的时候,可以屏蔽下游对上游系统造成的影响。此外还能把参数设置为mock=fail:return+null,这样消费者还是会发起远程调用,不过失败后会返回null,但是不抛出异常。
最后,如果配置的参数是以throw开头的,即mock=throw,则直接抛出RpcException,不会发起远程调用。
9.7.2 Mock的总体结构
Mock涉及的接口比较多,整个流程贯穿Cluster和Protocol层,接口之间的逻辑关系如下:
接口关系
主要流程分为Cluster层和Protocol层:
- MockClusterWrapper是一个包装类,包装类会自动注入合适的扩展点实现,它的逻辑很简单,只是把包装扩展类作为初始化参数来创建并返回一个MockClusterInvoker。
- MockClusterInvoker和其他的ClusterInvoker不一样,在Invoker方法中完成主要逻辑。
- MockInvokerSelector是Router接口的一种实现,用于过滤Mock的Invoker。
- MockProtocol根据用户传入的URL和类型生成一个MockInvoker。
- MockInvoker实现最终的Invoker逻辑。
MockInvoker和MockClusterInvoker看起来都是Invoker,区别是什么?
实现偶爱你,强调Mock、失败后返回Mcok结果等逻辑是在McokClusterInvoker里调用的;其次,McokClusterInvoker在某些逻辑下,会生成MockInvoker并进行调用;然后,在MockInvoker里会处理mock="return null"、mock="throw xxx"或mock=com.xxService这些配置。最后MockInvoker还会被MockProtocol在引用远程服务的时候创建。可以认为,MockClusterInvoker会处理一些Class级别的Mock逻辑,例如:选择调用哪个Mock类。MockInvoker处理的是方法级别的Mock逻辑,如返回值。
9.7.3 Mock的实现原理
MockClusterWrapper是一个包装类,它在创建McokClusterInvoker的时候会被包装的Invoker传入构造方法,因此MockClusterInvoker内部天生就含有一个Invoker的引用。MockClusterInvoker的invoke方法处理了主要逻辑:
(1) 获取Invoker的Mock参数。前面讲过,该Invoker是在构造方法传入的。如果该Invoker根本没有配置Mock,则直接调用Invoker的invoke方法并返回结果;如果配置了Mock参数,进入下一步。
(2) 判断参数是否以force开头,即判断是否强制Mock。如果是强制,则进入doMockInvoke逻辑。如果不是,则进入失败后才Mock的逻辑。
(3) 失败后调用doMockInvoker逻辑并返回结果。在try代码块直接调用Invoker的invoke方法,如果有异常,则在catch中调用doMockInvoke逻辑。
doMockInvoke的逻辑:
(1) 通过selectMockInvoker获得所有 Mock类型的Invoker。selectMockInvoker在对象的attachment属性中偷偷放进一个invocation.need.mock=true的标识。directory在list方法列出所有Invoker的时候,如果检测到这个标识,则使用MockInvokersSelector来过滤Invoker,而不是使用普通的route实现,最后返回Mock类型的Invoker列表。如果一个Mock类型的Invoker都没有返回,则通过directory的URL新创建一个MockInvoker;如果有,则使用第一个。
(2) 调用MockInvoker的invoke方法。在try-catch中调用invoke方法并返回结果。如果有异常,并且是业务异常,则包装成一个RpcResult返回,否则返回RpcException异常。
在doMockInvoke的第1步,directory会使用MockInvokersSelector来过滤出Mcok类型的Invoker。MockInvokersSelector是Router接口的其中一种实现。它路由的具体逻辑如下:
(1) 判断是否需要做Mock过滤。如果attachment为空,或者没有invocation.need.mock=true的标识,则认为不需要做Mock过滤,进入第2步;如果找到这个标识,进入第3步。
(2) 获取非Mock类型的Invoker。遍历所有的Invoker,如果它们的protocol中都没有Mock参数,则整个列表返回。否则,把protocol中所有没有Mock标识的取出来并返回。
(3) 获取Mock类型的Invoker。遍历所有的Invoker,如果他们的protocol中都没有Mock参数,则直接返回null。否则把protocol中所有含有Mock标识的取出来并返回。
MockProtocol也是协议的一种,主要是把注册中心的Mock URL转换成MockInvoker对象。URL可以痛殴dubbo-admin或其他方式写入注册中心,他被定义为只能引用,不能暴露。
例如我们在注册中心/dubbo/com.test.xxService/providers这个服务提供者目录下,写入一个Mock的URL:mock://192.168.0.123/com.test.xxService。
在MockInvoker的invoke方法中,处理逻辑如下:
(1) 获取Mock参数值。通过URL获取Mock配置的参数,如果为空则泡异常。优先会获取方法界别的Mock参数,例如:以methodName.mock为key去获取参数值;如果获取不到,则尝试以mock为key获取参数值。
(2) 处理参数值是return的配置。如果只配置了return,即mock=return,则返回一个空的RpcResult;如果后面跟了别的参数,则首先解析返回类型,然后结合Mock参数和返回类型,返回Mock值。目前支持的类型包括:Mock参数值等于empty,根据返回类型返回new xxx()空对象;如果是null、true、false,则直接返回这些值;如果是其他字符串,则返回字符串;如果是数字、List、Map,则返回对应的JSON串;如果没匹配上,则直接返回Mock的参数值。
(3) 处理参数值是throw的配置。如果throw后面没有字符串,则包装一个RpcException异常直接抛出;如果后面有自定义的异常,则使用自定义异常类,并包装成一个RpcException抛出。
(4) 处理Mock实现类。先从缓存中取,如果有直接返回。如果缓存没有,则先获取接口的类型,如果Mock的参数配置是true或default,则尝试通过"接口名+Mock"查找Mock实现类。