Dubbo之限流TpsLimitFilter源码分析
本文基于incubator-dubbo 2.7.0版本
前言
在分布式系统中,限流和熔断是处理并发的两大利器。关于限流和熔断,需要记住一句话,客户端熔断,服务端限流。本文我会讲解Dubbo框架对限流的支持。
限流的作用
我个人理解限流的作用,保护应用,防止雪崩。每个应用都有自己处理请求的上限,一旦应用承受过多请求,首先会对正在处理中的请求造成影响,如果更严重,对上下游也会造成雪崩效应。
TpsLimitFilter分析
Dubbo中的限流通过TpsLimitFilter来实现,会在invoker执行实际业务逻辑前进行拦截,判断单位时间请求数是否超过上限,如果超过,抛出异常阻断调用。
TpsLimitFilter源码如下
@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
new StringBuilder(64)
.append("Failed to invoke service ")
.append(invoker.getInterface().getName())
.append(".")
.append(invocation.getMethodName())
.append(" because exceed max service tps.")
.toString());
}
return invoker.invoke(invocation);
}
}
从TpsLimitFilter的源码中可以看到,因为是扩展点自动激活配置,首先TpsLimitFilter只对provider端有效,其次provider url的需要包括tps=xxx这个配置才能生效。
通过TPSLimiter的isAllowable实现限流 ,其内部采用了计数器算法,单位时间内限制多少调用次数,超过限制,返回false。
public class DefaultTPSLimiter implements TPSLimiter {
/**
* 每个Service维护一个计数器
*/
private final ConcurrentMap<String, StatItem> stats
= new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL);
//servicekey并没有和方法绑定,只能限流接口
String serviceKey = url.getServiceKey();
if (rate > 0) {
StatItem statItem = stats.get(serviceKey);
if (statItem == null) {
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
return statItem.isAllowable();
} else {
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
}
return true;
}
}
TPSLimiter 针对每个service都创建一个计数器StatItem,通过StatItem的isAllowable方法判断请求是否有效
class StatItem {
//接口名
private String name;
//计数周期开始
private long lastResetTime;
//计数间隔
private long interval;
//剩余计数请求数
private AtomicInteger token;
//总共允许请求数
private int rate;
StatItem(String name, int rate, long interval) {
this.name = name;
this.rate = rate;
this.interval = interval;
this.lastResetTime = System.currentTimeMillis();
this.token = new AtomicInteger(rate);
}
public boolean isAllowable() {
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {
token.set(rate);
lastResetTime = now;
}
int value = token.get();
boolean flag = false;
while (value > 0 && !flag) {
//乐观锁增加计数
flag = token.compareAndSet(value, value - 1);
//失败重新获取
value = token.get();
}
return flag;
}
long getLastResetTime() {
return lastResetTime;
}
int getToken() {
return token.get();
}
@Override
public String toString() {
return new StringBuilder(32).append("StatItem ")
.append("[name=").append(name).append(", ")
.append("rate = ").append(rate).append(", ")
.append("interval = ").append(interval).append("]")
.toString();
}
}
StatItem内的逻辑很简单,针对每段时间(lastResetTime,lastResetTime+interval)允许rate次调用,只要计数器达不到上限,返回true。如果超过lastResetTime+interval,重置计数器。
使用TpsLimitFilter
令人费解的是,Dubbo框架并没有默认通过配置文件启动这个Filter,所以我们需要在classpath的META-INF/dubbo/目录下增加com.alibaba.dubbo.rpc.Filter文件
tps=com.alibaba.dubbo.rpc.filter.TpsLimitFilter
就算加上了这个配置,其实也还是生效不了,我们的provider url需要有tps=xxx参数
问题就来了,怎么加这个配置呢,答案就是override,这个功能的官方介绍如下
override的原理是,其实在RegistryProtocol使用export方法对服务进行本地暴露以及注册Provider Url到zk后,还做了另外一个操作,监听服务对应的 /dubbo/interface/configurations目录,一旦configurations目录下节点发生变化,就会重新生成暴露的url,然后进行reexport。
具体相关源码大家可以细细品味下,我觉得这个设计是dubbo服务治理的核心。
注册监听代码如下
//得到override url,用于监听configurations目录
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
//构造监听器,用于provider url被override时 重新发布exporter
//监听路径为 /dubbo/interface/configurations
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//向registry订阅这个url路径
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
回到正题,那么我们怎么让tps生效呢?
在zk的configurations目录下,增加一个目录,目录名如下
override://10.111.27.41:20880/com.alibaba.dubbo.demo.DemoService?tps=5&category=configurators
zk操作命令如下
create -e /dubbo/com.alibaba.dubbo.demo.DemoService/configurators/override%3a%2f%2f10.111.27.41%3a20880%2fcom.alibaba.dubbo.demo.DemoService%3ftps%3d5%26category%3dconfigurators 1
注意overrider后面这端url需要进行URLEncode,因为里面包含了/符号,zk会误识别为目录。 -e用于创建临时目录,客户端断开后这个目录会失效,也就是限流会失效。创建zk目录的时候需要注意下。最好设置成永久。
我通过以上方式设置tps=5之后,超过第六次调用后,就对客户端抛出异常了
限流算法
Dubbo的限流算法使用了最简单的计数器算法,如果并发流量刚好在上个计数器最后一秒和下个计数器第一秒来临,也不能完全预防突发流量,所以推荐自己使用令牌桶算法或漏桶算法实现自定义限流Filter,并且也可以考虑分布式限流。
关于限流算法,下面这篇文章还不错。
https://blog.csdn.net/tianyaleixiaowu/article/details/74942405
总结
Dubbo设计扩展性真的很强,我们可以通过对Dubbo源码的学习,学习到各个方面的知识,举一反三,应用到实际项目中去,也会有助于对其他框架的源码理解。
最后
希望大家关注下我的公众号
image