Dubbo 并发控制
2020-01-21 本文已影响0人
晴天哥_王志
消费端并发控制
<dubbo:reference id="userService" interface="com.test.UserServiceBo"
group="dubbo" version="1.0.0" timeout="3000" actives="10"/>
- 在服务消费方设置接口中每个方法并发请求个数,通过设置actives参数。
<dubbo:reference id="userService" interface="com.test.UserServiceBo"
group="dubbo" version="1.0.0" timeout="3000">
<dubbo:method name="sayHello" actives="10" />
</dubbo:reference>
- 在服务消费方设置接口中的某个方法的并发请求个数,通过设置actives参数。
服务端并发控制
<dubbo:service interface="com.test.UserServiceBo" ref="userService"
group="dubbo" version="1.0.0" timeout="3000" executes="10"/>
- 在服务提供方设置接口中每个方法的并发请求数,通过设置executes参数。
<dubbo:service interface="com.test.UserServiceBo" ref="userService"
group="dubbo" version="1.0.0" timeout="3000" >
<dubbo:method name="sayHello" executes="10" />
</dubbo:service>
- 在服务提供方设置接口中某个方法的并发请求数,通过设置executes参数。
消费端并发限制 - ActiveLimitFilter
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
// 获取方法级别的并发限制的RpcStatus对象
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
int active = count.getActive();
// 通过synchronized和wait实现客户端并发限制超过时候需要等待直至超时。
if (active >= max) {
synchronized (count) {
while ((active = count.getActive()) >= max) {
try {
count.wait(remain);
} catch (InterruptedException e) {
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}
try {
long begin = System.currentTimeMillis();
// 累加方法级别的并发数
RpcStatus.beginCount(url, methodName);
try {
// 执行方法调用
Result result = invoker.invoke(invocation);
// 递减方法级别的并发数
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
// 递减方法级别的并发数
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
// 针对有并发限制的通过notify进行唤醒
if (max > 0) {
synchronized (count) {
count.notify();
}
}
}
}
}
- 1、首先会去获得服务消费端每服务每方法最大可并行执行请求数。
- 2、如果方法设置并发请求数就需要判断是否超并发数,超过并发数就等待直至超时。
- 3、按照累加并发数、执行方法、递减并发数,最后进行唤醒炒作。
- 4、 消费端设置actives时候会等待直至超时。
服务端并发限制 - ExecuteLimitFilter
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
Semaphore executesLimit = null;
boolean acquireResult = false;
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
if (max > 0) {
RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
// if (count.getActive() >= max) {
/**
* http://manzhizhen.iteye.com/blog/2386408
* use semaphore for concurrency control (to limit thread number)
*/
executesLimit = count.getSemaphore(max);
// 服务提供方设置并发数量后,如果同时请求数量大于了设置的executes的值,则会抛出异常。
if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}
long begin = System.currentTimeMillis();
boolean isSuccess = true;
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
isSuccess = false;
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
} finally {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
// 如果需要获取过信号量就进行释放动作。
if(acquireResult) {
executesLimit.release();
}
}
}
}
- 1、首先会去获得服务提供者每服务每方法最大可并行执行请求数。
- 2、如果每服务每方法最大可并行执行请求数大于零,那么就基于基于服务 URL + 方法维度获取一个RpcStatus实例。
- 3、通过RpcStatus实例获取一个信号量,若果获取的这个信号量调用tryAcquire返回false,则抛出异常。
- 4、如果没有抛异常,那么久调用RpcStatus静态方法beginCount,给这个 URL + 方法维度开始计数。
- 5、调用服务。
- 6、调用结束后计数调用RpcStatus静态方法endCount,计数结束。
- 7、释放信号量。
- 8、需要注意的是,服务提供方设置并发数量后,如果同时请求数量大于了设置的executes的值,则会抛出异常。
并发限制实现核心 - RpcStatus
public class RpcStatus {
// service级别的并发限制全局变量
private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();
// method级别的并发限制全局变量
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
// 记录活跃的记录
private final AtomicInteger active = new AtomicInteger();
private final AtomicLong total = new AtomicLong();
private final AtomicInteger failed = new AtomicInteger();
private final AtomicLong totalElapsed = new AtomicLong();
private final AtomicLong failedElapsed = new AtomicLong();
private final AtomicLong maxElapsed = new AtomicLong();
private final AtomicLong failedMaxElapsed = new AtomicLong();
private final AtomicLong succeededMaxElapsed = new AtomicLong();
// 用于记录服务提供端的限制
private volatile Semaphore executesLimit;
private volatile int executesPermits;
private RpcStatus() {
}
// 获取service级别的并发限制变量
public static RpcStatus getStatus(URL url) {
String uri = url.toIdentityString();
RpcStatus status = SERVICE_STATISTICS.get(uri);
if (status == null) {
SERVICE_STATISTICS.putIfAbsent(uri, new RpcStatus());
status = SERVICE_STATISTICS.get(uri);
}
return status;
}
// 获取method级别的并发限制变量,根据service=>method顺序查找
public static RpcStatus getStatus(URL url, String methodName) {
String uri = url.toIdentityString();
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
if (map == null) {
METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
map = METHOD_STATISTICS.get(uri);
}
RpcStatus status = map.get(methodName);
if (status == null) {
map.putIfAbsent(methodName, new RpcStatus());
status = map.get(methodName);
}
return status;
}
// 累加service 和 method的并发限制变量
public static void beginCount(URL url, String methodName) {
beginCount(getStatus(url));
beginCount(getStatus(url, methodName));
}
// 并发限制变量的原子累加
private static void beginCount(RpcStatus status) {
status.active.incrementAndGet();
}
// 递减service 和 method的并发限制变量
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
endCount(getStatus(url), elapsed, succeeded);
endCount(getStatus(url, methodName), elapsed, succeeded);
}
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
status.active.decrementAndGet();
status.total.incrementAndGet();
status.totalElapsed.addAndGet(elapsed);
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}
if (succeeded) {
if (status.succeededMaxElapsed.get() < elapsed) {
status.succeededMaxElapsed.set(elapsed);
}
} else {
status.failed.incrementAndGet();
status.failedElapsed.addAndGet(elapsed);
if (status.failedMaxElapsed.get() < elapsed) {
status.failedMaxElapsed.set(elapsed);
}
}
}
public Semaphore getSemaphore(int maxThreadNum) {
if(maxThreadNum <= 0) {
return null;
}
if (executesLimit == null || executesPermits != maxThreadNum) {
synchronized (this) {
if (executesLimit == null || executesPermits != maxThreadNum) {
executesLimit = new Semaphore(maxThreadNum);
executesPermits = maxThreadNum;
}
}
}
return executesLimit;
}
}
- service级别的并发限制全局变量 private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS。
- method级别的并发限制全局变量 private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS。
- 记录活跃的记录 private final AtomicInteger active。