阿里巴巴Dubbo源码解析

Dubbo之限流TpsLimitFilter源码分析

2018-08-24  本文已影响12人  土豆肉丝盖浇饭

本文基于incubator-dubbo 2.7.0版本

前言

在分布式系统中,限流和熔断是处理并发的两大利器。关于限流和熔断,需要记住一句话,客户端熔断,服务端限流。本文我会讲解Dubbo框架对限流的支持。

限流的作用

我个人理解限流的作用,保护应用,防止雪崩。每个应用都有自己处理请求的上限,一旦应用承受过多请求,首先会对正在处理中的请求造成影响,如果更严重,对上下游也会造成雪崩效应。

TpsLimitFilter分析

Dubbo中的限流通过TpsLimitFilter来实现,会在invoker执行实际业务逻辑前进行拦截,判断单位时间请求数是否超过上限,如果超过,抛出异常阻断调用。
TpsLimitFilter源码如下

@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {

    private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
            throw new RpcException(
                    new StringBuilder(64)
                            .append("Failed to invoke service ")
                            .append(invoker.getInterface().getName())
                            .append(".")
                            .append(invocation.getMethodName())
                            .append(" because exceed max service tps.")
                            .toString());
        }

        return invoker.invoke(invocation);
    }

}

从TpsLimitFilter的源码中可以看到,因为是扩展点自动激活配置,首先TpsLimitFilter只对provider端有效,其次provider url的需要包括tps=xxx这个配置才能生效。

通过TPSLimiter的isAllowable实现限流 ,其内部采用了计数器算法,单位时间内限制多少调用次数,超过限制,返回false。

public class DefaultTPSLimiter implements TPSLimiter {

    /**
     * 每个Service维护一个计数器
     */
    private final ConcurrentMap<String, StatItem> stats
            = new ConcurrentHashMap<String, StatItem>();

    @Override
    public boolean isAllowable(URL url, Invocation invocation) {
        int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
        long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
                Constants.DEFAULT_TPS_LIMIT_INTERVAL);
        //servicekey并没有和方法绑定,只能限流接口
        String serviceKey = url.getServiceKey();
        if (rate > 0) {
            StatItem statItem = stats.get(serviceKey);
            if (statItem == null) {
                stats.putIfAbsent(serviceKey,
                        new StatItem(serviceKey, rate, interval));
                statItem = stats.get(serviceKey);
            }
            return statItem.isAllowable();
        } else {
            StatItem statItem = stats.get(serviceKey);
            if (statItem != null) {
                stats.remove(serviceKey);
            }
        }

        return true;
    }

}

TPSLimiter 针对每个service都创建一个计数器StatItem,通过StatItem的isAllowable方法判断请求是否有效

class StatItem {

    //接口名
    private String name;

    //计数周期开始
    private long lastResetTime;

    //计数间隔
    private long interval;

    //剩余计数请求数
    private AtomicInteger token;

    //总共允许请求数
    private int rate;

    StatItem(String name, int rate, long interval) {
        this.name = name;
        this.rate = rate;
        this.interval = interval;
        this.lastResetTime = System.currentTimeMillis();
        this.token = new AtomicInteger(rate);
    }

    public boolean isAllowable() {
        long now = System.currentTimeMillis();
        if (now > lastResetTime + interval) {
            token.set(rate);
            lastResetTime = now;
        }

        int value = token.get();
        boolean flag = false;
        while (value > 0 && !flag) {
            //乐观锁增加计数
            flag = token.compareAndSet(value, value - 1);
            //失败重新获取
            value = token.get();
        }

        return flag;
    }

    long getLastResetTime() {
        return lastResetTime;
    }

    int getToken() {
        return token.get();
    }

    @Override
    public String toString() {
        return new StringBuilder(32).append("StatItem ")
                .append("[name=").append(name).append(", ")
                .append("rate = ").append(rate).append(", ")
                .append("interval = ").append(interval).append("]")
                .toString();
    }

}

StatItem内的逻辑很简单,针对每段时间(lastResetTime,lastResetTime+interval)允许rate次调用,只要计数器达不到上限,返回true。如果超过lastResetTime+interval,重置计数器。

使用TpsLimitFilter

令人费解的是,Dubbo框架并没有默认通过配置文件启动这个Filter,所以我们需要在classpath的META-INF/dubbo/目录下增加com.alibaba.dubbo.rpc.Filter文件

tps=com.alibaba.dubbo.rpc.filter.TpsLimitFilter

就算加上了这个配置,其实也还是生效不了,我们的provider url需要有tps=xxx参数

问题就来了,怎么加这个配置呢,答案就是override,这个功能的官方介绍如下


override的原理是,其实在RegistryProtocol使用export方法对服务进行本地暴露以及注册Provider Url到zk后,还做了另外一个操作,监听服务对应的 /dubbo/interface/configurations目录,一旦configurations目录下节点发生变化,就会重新生成暴露的url,然后进行reexport。
具体相关源码大家可以细细品味下,我觉得这个设计是dubbo服务治理的核心。
注册监听代码如下

 //得到override url,用于监听configurations目录
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        //构造监听器,用于provider url被override时 重新发布exporter
        //监听路径为 /dubbo/interface/configurations
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        //向registry订阅这个url路径
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

回到正题,那么我们怎么让tps生效呢?

在zk的configurations目录下,增加一个目录,目录名如下

override://10.111.27.41:20880/com.alibaba.dubbo.demo.DemoService?tps=5&category=configurators

zk操作命令如下

create -e /dubbo/com.alibaba.dubbo.demo.DemoService/configurators/override%3a%2f%2f10.111.27.41%3a20880%2fcom.alibaba.dubbo.demo.DemoService%3ftps%3d5%26category%3dconfigurators 1

注意overrider后面这端url需要进行URLEncode,因为里面包含了/符号,zk会误识别为目录。 -e用于创建临时目录,客户端断开后这个目录会失效,也就是限流会失效。创建zk目录的时候需要注意下。最好设置成永久。

我通过以上方式设置tps=5之后,超过第六次调用后,就对客户端抛出异常了


限流算法

Dubbo的限流算法使用了最简单的计数器算法,如果并发流量刚好在上个计数器最后一秒和下个计数器第一秒来临,也不能完全预防突发流量,所以推荐自己使用令牌桶算法或漏桶算法实现自定义限流Filter,并且也可以考虑分布式限流。

关于限流算法,下面这篇文章还不错。
https://blog.csdn.net/tianyaleixiaowu/article/details/74942405

总结

Dubbo设计扩展性真的很强,我们可以通过对Dubbo源码的学习,学习到各个方面的知识,举一反三,应用到实际项目中去,也会有助于对其他框架的源码理解。

最后

希望大家关注下我的公众号


image
上一篇下一篇

猜你喜欢

热点阅读