Dubbo

【Dubbo】路由、负载均衡、降级、集群容错

2019-01-02  本文已影响0人  半个橙子

部署服务

启动两个provider,一个consumer。其中一个provider修改配置文件端口为20881打包成jar并运行,idea中中运行另一个provider(20880)和cunsumer。

public class DemoAction {
    private DemoService demoService;
    public void setDemoService(DemoService demoService) {
        this.demoService = demoService;
    }
    public void start() throws Exception {
        for (int i = 0; i < Integer.MAX_VALUE; i ++) {
            try {
                String hello = demoService.sayHello("world" + i);
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + hello);
            } catch (Exception e) {
                e.printStackTrace();
            }
            Thread.sleep(2000);
        }
    }
}
public class DemoServiceImpl implements DemoService {
    public String sayHello(String name) {
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
        return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress();
    }
    
}
打包provider

修改provider的配置


image.png

在dubbo-master 根目录执行
E:\dubbo\dubbo-master>mvn clean compile package install -Dmaven.test.skip=true

image.png

打包完成后dubbo-demo-provider\target目录下有dubbo-demo-provider-2.5.4-SNAPSHOT-assembly.tar.gz解压运行其dubbo-demo-provider-2.5.4-SNAPSHOT\bin\start.bat启动服务

image.png image.png
部署dubb-admin

dubbo-admin是dubbo进行服务治理的一个web控制台,可以动态变更zk上的配置,来控制provider、consumer的行为。部署只需要修改一下/WEB-INF/dubbo.properties下的配置文件,再打包成dubbo-admin.war直接部署就可以了。

dubbo.registry.address=zookeeper://192.168.99.100:2181
dubbo.admin.root.password=root
dubbo.admin.guest.password=guest

目录服务、路由、负载均衡、集群容错代码执行流程

代码的入口是MockClusterInvoker.invoke,从这里开始会依次进行容错或屏蔽、路由策略、负载均衡等来处理从RegistryDirectory中invoker列表,最终找到一个合适的invoker

demoService.sayHello("world" + i)
-->InvokerInvocationHandler.invoke
  -->invoker.invoke
    -->RpcInvocation//所有请求参数都会转换为RpcInvocation
    -->MockClusterInvoker.invoke //1.进入集群
      -->invoker.invoke(invocation)
        -->AbstractClusterInvoker.invoke
          -->list(invocation)
            -->directory.list//2.进入目录查找   从this.methodInvokerMap里面查找一个Invoker
              -->AbstractDirectory.list
                -->doList(invocation)
                  -->RegistryDirectory.doList// 从this.methodInvokerMap里面查找一个Invoker
                -->router.route //3.进入路由 
                  -->MockInvokersSelector.route
                    -->getNormalInvokers
          -->ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("roundrobin")
          -->doInvoke
            -->FailoverClusterInvoker.doInvoke
              -->select//4.进入负载均衡
                -->AbstractClusterInvoker.select
                  -->doselect
                    -->loadbalance.select
                      -->AbstractLoadBalance.select
                        -->doSelect
                          -->RoundRobinLoadBalance.doSelect
                            -->invokers.get(currentSequence % length)//取模轮循
              -->Result result = invoker.invoke(invocation)

服务调用流程

image.png

代码执行流程

  1. 动态代理:当调用provider的demoService.sayHello服务的时候,这个动态代理类的InvocationHandler.invoke方法
public class DemoAction {
    private DemoService demoService;
    public void setDemoService(DemoService demoService) {
        this.demoService = demoService;
    }
    public void start() throws Exception {
        for (int i = 0; i < Integer.MAX_VALUE; i ++) {
            try {
                String hello = demoService.sayHello("world" + i);
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + hello);
            } catch (Exception e) {
                e.printStackTrace();
            }
            Thread.sleep(2000);
        }
    }
}
  1. 进入Cluster处理
    将所有参数都封装程RpcInvocation,调用clusterinoker的invoke,第一层包装的是MockClusterInvoker
#com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
......
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        //调用远端服务 所有请求参数都封装成RpcInvocation
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

#com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker#invoke
public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
        if (value.length() == 0 || value.equalsIgnoreCase("false")){
            //no mock 不需要mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);
            }catch (RpcException e) {
                    result = doMockInvoke(invocation, e);
            }
        }
        return result;
    }
  1. 进入目录服务
    目录服务获取当前方法的invoker列表,根据路由规则筛选invoker,然后进行负载均衡策略再次筛选出一个invoker,最后执行该invoker的调用
#com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#invoke

      public Result invoke(final Invocation invocation) throws RpcException {

        checkWhetherDestroyed();

        LoadBalance loadbalance;
        //2.进入目录查找   从this.methodInvokerMap里面查找一个Invoker 然后进入路由
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {
            //负载均衡器roundrobin
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        //进入集群容错负载均衡 FailoverClusterInvoker
        return doInvoke(invocation, invokers, loadbalance);
    }

invoker列表


image.png

Directory目录服务

RegistryDirectory主要用于维护注册中心的动态配置,zk服务变更的时候会触发其notify方法,然后调用refreshInvoker重新刷新methodInvokerMap,实现了配置的动态变更,通过doList根据invocation中的方法名获取最新的invoker。而且我们在管理后台的配置基本上都是直接操作zk的configurators,变更configurators后consumer会收到通知,
会合并configurators的配置、consumer自己的配置、provider的配置来调整methodInvokerMap中invoker的url,进行调用的时候会根据url参数动态选择路由器、负载、集群容错等

#com.alibaba.dubbo.registry.integration.RegistryDirectory#doList
public List<Invoker<T>> doList(Invocation invocation) {
        if (forbidden) {
            throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "Forbid consumer " +  NetUtils.getLocalHost() + " access service " + getInterface().getName() + " from registry " + getUrl().getAddress() + " use dubbo version " + Version.getVersion() + ", Please check registry access list (whitelist/blacklist).");
        }
        List<Invoker<T>> invokers = null;
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            //sayHello
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if(args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由
            }
            if(invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            if(invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if(invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }

public synchronized void notify(List<URL> urls) {
        List<URL> invokerUrls = new ArrayList<URL>();
        List<URL> routerUrls = new ArrayList<URL>();
        List<URL> configuratorUrls = new ArrayList<URL>();
        for (URL url : urls) {
            String protocol = url.getProtocol();
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            if (Constants.ROUTERS_CATEGORY.equals(category) 
                    || Constants.ROUTE_PROTOCOL.equals(protocol)) {
                routerUrls.add(url);
            } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 
                    || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
                configuratorUrls.add(url);
            } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } else {
                logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
            }
        }
        ......
        // providers
        //刷新invoker
        refreshInvoker(invokerUrls);
    }
    private void refreshInvoker(List<URL> invokerUrls){
        if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
                && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
            this.forbidden = true; // 禁止访问
            this.methodInvokerMap = null; // 置空列表
            destroyAllInvokers(); // 关闭所有Invoker
        } else {Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 将URL列表转成Invoker列表
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
            // state change
            //如果计算错误,则不进行处理.
            if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
                return ;
            }
            this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
            //刷新invoker
            this.urlInvokerMap = newUrlInvokerMap;
    }
    }

路由

负责从多个invoker中按路由规则选出子集,如应用隔离、读写分离、或灰度发布等,路由规则可以从dubbo-admin后台动态配置

#com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#list
  public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed){
            throw new RpcException("Directory already destroyed .url: "+ getUrl());
        }
        //查找invoker列表
        List<Invoker<T>> invokers = doList(invocation);
        //3.进入路由
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && localRouters.size() > 0) {
            for (Router router: localRouters){
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }
使用路由规则灰度发布

使用dubbo-admin控制台可以动态新增路由规则,进行灰度发布。启动dubbo-admin.war(修改配置文件/WEB-INF/dubbo.properties中zk地址,用户名root密码root)


image.png
  1. 发布20880,切断20881访问流量,然后进行服务的发布。
  2. 20880发布成功后,恢复 20880的流量,
  3. 切断20880,继续发布20881
路由规则有哪些实现类?

ConditionRouter:条件路由,后台管理的路由配置都是条件路由。
ScriptRouter:脚本路由
MockInvokersSelector:默认使用

image.png
启动路由规则,它触发了那些动作?
  1. 什么时候加入ConditionRouter?
    默认情况只有MockInvokersSelector这个路由,新增路由规则的时候zk的配置会变更,然后触发consumer的RegistryDirectory.notify方法取到配置后重新维护路由规则,添加路由
#com.alibaba.dubbo.registry.integration.RegistryDirectory#notify
 public synchronized void notify(List<URL> urls) {
.....
  if (routerUrls != null && routerUrls.size() >0 ){
            List<Router> routers = toRouters(routerUrls);
            if(routers != null){ // null - do nothing
                setRouters(routers);
            }
        }
......
}
#com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#setRouters
  protected void setRouters(List<Router> routers){
.....
        // append mock invoker selector
        routers.add(new MockInvokersSelector());
        Collections.sort(routers);
        this.routers = routers;
    }
  1. ConditionRouter是怎么过滤的?
    在添加完路由规则后会重新刷新invoker列表,这时候根据过滤规则过滤服务提供者,然后更新newMethodInvokerMap
private void refreshInvoker(List<URL> invokerUrls){
.....
            Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表

......
}
    private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
      ......
        newMethodInvokerMap.put(Constants.ANY_VALUE, invokersList);
        if (serviceMethods != null && serviceMethods.length > 0) {
            for (String method : serviceMethods) {
                List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
                if (methodInvokers == null || methodInvokers.size() == 0) {
                    methodInvokers = invokersList;
                }
                //路由规则路过滤
                newMethodInvokerMap.put(method, route(methodInvokers, method));
            }
        }
}
#com.alibaba.dubbo.rpc.cluster.router.condition.ConditionRouter#route
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
    if (! matchWhen(url)) {
                return invokers;
            }
          for (Invoker<T> invoker : invokers) {
                if (matchThen(invoker.getUrl(), url)) {
                    result.add(invoker);
                }
            }
......
}
image.png

因为过滤规则有两个条件部分provider,consumer,所以除了在配置变更的时候统一过滤服务提供者,还会在consumer每次发起调用的时候根据consumer的条件动态过滤invoker


路由条件
#com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#list
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
......
            for (Router router: localRouters){
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
            }
        }
......
}

负载均衡

经过路由规则过滤后的invokers,需要通过负载均衡算法选择其中一个invoker进行RPC调用

#com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker#doselect
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
       ......
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
......
        return invoker;
    } 
负载均衡算法
动态修改负载均衡策略

可以针对服务提供者的某个方法或者整个服务提供者修改负载均衡策略


image.png

admin向zk修改之后会通知consumer这个动态配置configurators节点已经变更了,然后consumer会合并configurators的配置、consumer自己的配置、provider的配置调整invoker的url,进行调用的时候会根据url参数动态选择对应的负载均衡器


image.png
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
......
            URL url = mergeUrl(providerUrl);
......}
 /**
     * 合并url参数 顺序为override > -D >Consumer > Provider
     * @param providerUrl
     * @param overrides
     * @return
     */
    private URL mergeUrl(URL providerUrl){
        providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // 合并消费端参数
        
        List<Configurator> localConfigurators = this.configurators; // local reference
        if (localConfigurators != null && localConfigurators.size() > 0) {
            for (Configurator configurator : localConfigurators) {
                providerUrl = configurator.configure(providerUrl);
            }
        }
......
}

服务降级

什么是服务开关

先讲一下开关的由来,例如淘宝在11月11日做促销活动,在交易下单环节,可能需要调用A、B、C三个接口来完成,但是其实A和B是必须的, C只是附加的功能(例如在下单的时候做一下推荐,或push消息),可有可无,在平时系统没有压力,容量充足的情况下,调用下没问题,但是在类似店庆之类的大促环节, 系统已经满负荷了,这时候其实完全可以不去调用C接口,怎么实现这个呢? 改代码?

什么是服务降级

服务降级,当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级(执行固定的逻辑),以此释放服务器资源以保证核心任务的正常运行。

dubbo如何实现服务降级?
如何使用

当consumer调用provider不通的时候,consumer直接超时报错了


consumer异常
屏蔽降级原理
public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        //判断是否有容错或者屏蔽
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
        if (value.length() == 0 || value.equalsIgnoreCase("false")){
            //no mock 1.进入集群 FailoverClusterInvoker
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            //屏蔽
            if (logger.isWarnEnabled()) {
                logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " +  directory.getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //容错mock
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);
            }catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " +  directory.getUrl(), e);
                    }
                    //集群容错获取mock值 mock=fail:return null
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }
    private Result doMockInvoke(Invocation invocation,RpcException e){
......
            minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
            result = minvoker.invoke(invocation);
......
}
#com.alibaba.dubbo.rpc.support.MockInvoker
    public Result invoke(Invocation invocation) throws RpcException {
......
            Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
                Object value = parseMockValue(mock, returnTypes);
                return new RpcResult(value);
}

集群容错

广播调用所有提供者,逐个调用,任意一台报错则报错 [2]。通常用于通知所有提供者更新缓存或日志等本地资源信息

总结

image.png
上一篇下一篇

猜你喜欢

热点阅读