Dubbo源码分析----过滤器之ActiveLimitFilt

2018-07-17  本文已影响57人  _六道木

ActiveLimitFilter也是用来做并发控制的,区别在于ExecuteLimitFilter作用于服务端,而ActiveLimitFilter作用于客户端。
看下官网的例子

<dubbo:service interface="com.foo.BarService" actives="10" />

即从客户端方面限制了服务最多有10个并发

接下来看下ActiveLimitFilter的invoke方法

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        // 获取配置的数量
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        // 获取当前接口调用统计
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (max > 0) {
            //获取接口超时时间
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();// 获取并发数
            if (active >= max) {// 如果大于最大数量
                synchronized (count) {
                    while ((active = count.getActive()) >= max) {
                        try {
                            // 挂起当前线程并释放锁,因为并发数已超过限制
                            count.wait(remain);
                        } catch (InterruptedException e) {
                        }
                        // 通过notify唤醒了,计算挂起的时间
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if (remain <= 0) {// 如果已经超过超时时间
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                                   + invoker.getInterface().getName() + ", method: "
                                                   + invocation.getMethodName() + ", elapsed: " + elapsed
                                                   + ", timeout: " + timeout + ". concurrent invokes: " + active
                                                   + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }
        try {
            long begin = System.currentTimeMillis();
            // 增加该方法的数量
            RpcStatus.beginCount(url, methodName);
            try {
                // 调用
                Result result = invoker.invoke(invocation);
                // 减少数量
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                return result;
            } catch (RuntimeException t) {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            if(max>0){// 调用完成后调用notify唤醒在等待的线程
                synchronized (count) {
                    count.notify();
                } 
            }
        }
    }

整个逻辑也是比较简单的,和ExecuteLimitFilter不太一样的是,ExecuteLimitFilter在超过限制的数量后会直接返回,而ActiveLimitFilter会等待,直到超时。

上一篇下一篇

猜你喜欢

热点阅读