Dubbo源码解析分布式

Dubbo之降级Mock源码分析

2018-09-09  本文已影响11人  土豆肉丝盖浇饭

熔断与降级

说到降级必须扯到熔断。这两个技术既可以独立使用也可以一起使用。下面从我的理解来说下它们的区别。

熔断一般用于调用下游服务失败的时候,连续累计失败后,开启熔断器,之后的服务都不再调用下游服务,要么返回失败要么降级,直到满足熔断器关闭策略后,才会再次调用下游。比如,大麦网抢票给你返回业务繁忙什么的(被动)。
降级一般用于并发高峰期把资源让给核心业务,可以被熔断触发,也可以由运营开发人员主动开启,或者有其他启动策略。比如双11淘宝不能修改收货地址,相当于强制让接口返回特定异常(主动)。

Dubbo中降级的使用

在Dubbo中通过对接口或方法配置mock参数来设置对应降级实现。

下面我们先罗列下所有的mock表达式

mock表达式 作用
false 不使用mock
true/default/fail mock调用名为接口+Impl类对应mock方法
{mockClass} mock调用${mockClass}对应方法
return xxx 直接返回xxx的Mock数据,xxx支持json数据
throw xxxException 直接抛出异常
force xxx 以上方式都是在rpc调用远程服务失败后采用mock逻辑,加上force前缀后,在客户端直接走mock逻辑
force mock表达式只含有force的话,直接走接口+Impl类对应mock方法逻辑

上面表达式的force配置将mock调用分为了两种模式,强制mock失败mock。强制mock直接走对应mock逻辑,而失败mock,会在rpc调用失败时再调用。

接下来讲解下在Dubbo框架中如何设置mock。设置方式分为三种,xml配置(不推荐),Dubbo Admin控制台,ZK设置Override动态配置。后两种的原理都是对ZK节点进行操作。

首先讲下xml方式

<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" mock="return `scj`">
            <dubbo:parameter key="sayHello.mock" value="force" />
 </dubbo:reference>

通过xml配置,可以配置接口上的mock字段,只可以配置false,true/default,return xxx,{mockClass}这几种类型。AbstractInterfaceConfig解析的只能解析这几种。不知道算不算bug。
接口具体方法上的mock字段只能配置false,true/default/fail,{mockClass}以及force。因为spring框架的PropertyValue解析不支持这些不带有特殊符号以及空格的配置。

我觉得xml配置用于设置失败mock,也就是被动型的降级。主动型的降级,通过Dubbo Admin或者直接操作ZK来实现。

ZK操作代码如下

create -e %2fdubbo%2fcom.alibaba.dubbo.demo.DemoService%2fconfigurators%2foverride%3a%2f%2f10.111.27.41%3a20880%2fcom.alibaba.dubbo.demo.DemoService%3fsayHello.mock%3dforce%3areturn+%60scj+mock+hhh%60%26category%3dconfigurators 1

具体这个配置是如何生效的,可以看下我写的ZookeeperRegistry这篇文章。

源码分析

首先我们先找到注入Mock逻辑的MockClusterWrapper类

public class MockClusterWrapper implements Cluster {

    private Cluster cluster;

    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }

}

可以看到这是一个SPI自动包装类,也就是Cluster在哪里被使用到,这个mock逻辑就会在哪里切入。了解过服务引用的朋友,应该知道在哪里。

在MockClusterWrapper的join方法会把 this.cluster.join(directory)作为参数传入,this.cluster.join(directory)生成的invoker也就是该接口的集群调用invoker,会在失败mock中被使用到。

接下来看MockClusterInvoker的实现。

MockClusterInvoker的invoke方法区分了强制Mock和失败Mock,强制Mock直接调用mock逻辑,即doMockInvoke方法。而失败Mock,会先进行集群调用,报出RPC异常后,再走mock逻辑。

public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        //获取directoryUrl,会包括客户端和Configuration节点url的合并url
        //从method.xxx获取参数,如果没有,直接从xxx参数获取
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            //如果没有mock配置,不走mock逻辑
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            //force开头,强制进行mock //这个force只能通过override触发
            if (logger.isWarnEnabled()) {
                logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //不是force开头,调用失败走mock逻辑
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                    }
                    //不是force开头,会在调用失败后,走mock逻辑
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }

doMockInvoke方法通过directory.getUrl()生成MockInvoker。
selectMockInvoker方法触发的场景我暂时还不知道,我们暂且认为它返回为空。

private Result doMockInvoke(Invocation invocation, RpcException e) {
        Result result = null;
        Invoker<T> minvoker;

        //selectMockInvoker会选择MockProtocol暴露的MockInvoker
        //目前没发现可以通过这种协议暴露。。。
        List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
        if (mockInvokers == null || mockInvokers.isEmpty()) {
            //使用mockinvoker处理请求
            minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
        } else {
            minvoker = mockInvokers.get(0);
        }
        try {
            result = minvoker.invoke(invocation);
        } catch (RpcException me) {
            if (me.isBiz()) {
                result = new RpcResult(me.getCause());
            } else {
                throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
            }
        } catch (Throwable me) {
            throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
        }
        return result;
    }

在MockInvoker中会对mock表达式进行解析,并且做出相应的调用

public Result invoke(Invocation invocation) throws RpcException {
        //获取配置为methodName.mock的配置
        String mock = getUrl().getParameter(invocation.getMethodName() + "." + Constants.MOCK_KEY);
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(this);
        }
        if (StringUtils.isBlank(mock)) {
            //从url中提取mock配置
            mock = getUrl().getParameter(Constants.MOCK_KEY);
        }

        if (StringUtils.isBlank(mock)) {
            throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
        }
        //对mock字段进行处理
        mock = normallizeMock(URL.decode(mock));
        if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())) {
            //如果是return
            //默认返回null
            RpcResult result = new RpcResult();
            result.setValue(null);
            return result;
        } else if (mock.startsWith(Constants.RETURN_PREFIX)) {
            //如果是return开头
            //返回解析的数据
            mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
            mock = mock.replace('`', '"');
            try {
                Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
                Object value = parseMockValue(mock, returnTypes);
                return new RpcResult(value);
            } catch (Exception ew) {
                throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: " + url, ew);
            }
        } else if (mock.startsWith(Constants.THROW_PREFIX)) {
            //如果是throw开头 抛出异常
            mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
            mock = mock.replace('`', '"');
            if (StringUtils.isBlank(mock)) {
                throw new RpcException(" mocked exception for Service degradation. ");
            } else { // user customized class
                Throwable t = getThrowable(mock);
                throw new RpcException(RpcException.BIZ_EXCEPTION, t);
            }
        } else { //impl mock
            //执行自定义Mock类逻辑
            try {
                Invoker<T> invoker = getInvoker(mock);
                return invoker.invoke(invocation);
            } catch (Throwable t) {
                throw new RpcException("Failed to create mock implemention class " + mock, t);
            }
        }
    }

具体有哪些mock表达式配置,参见前一节的使用方式。

总结

Dubbo中Mock降级调用的实现,主要是通过SPI自动包装,对Cluster集群调用模块进行包装,如果url中的mock配置生效,走mock逻辑(强制和失败模式),否则走原先的Cluster集群调用模式。

最后

希望大家关注下我的公众号


image
上一篇下一篇

猜你喜欢

热点阅读