Hystrix源码分析(三)-信号量
2019-06-15 本文已影响0人
leiwingqueen
一、概要
hystrix的执行隔离策略有两种。一种是线程池的模式,另外一种是信号量的模式。hystrix默认的策略是线程池的模式。
- 线程池
对于每个command的执行会启用一个新的线程,同一个command key会使用同一个线程池,不同的command key会使用不同的线程池就进行隔离。
优点:由于command的执行隔离在不同的线程中,因此某一个command线程执行异常不会影响到其它command的执行。如果希望使用到异步的方式执行,只能选择这种模式。
缺点:由于多了线程的介入,相当于增加了线程间的上下文切换的成本。另外如果系统的command key很多,可能会影响到系统的整体性能。 - 信号量
个人理解跟信号量其实没有太大的关联性。本质上是跟主线程共用一个线程,只是这里增加了一个最大并发数的限制(信号量)。
优点:不涉及线程上下文切换。在大部分业务场景下其实是能满足的(eg.tomcat单请求单线程的业务,command key之间没有异步执行的需求)
缺点:某一个command执行过程中导致线程崩溃会影响到整个流程的执行(共享线程)。
二、源码分析
- 信号量的接口定义
/* package */static interface TryableSemaphore {
/**
* Use like this:
* <p>
*
* <pre>
* if (s.tryAcquire()) {
* try {
* // do work that is protected by 's'
* } finally {
* s.release();
* }
* }
* </pre>
*
* @return boolean
*/
public abstract boolean tryAcquire();
/**
* ONLY call release if tryAcquire returned true.
* <p>
*
* <pre>
* if (s.tryAcquire()) {
* try {
* // do work that is protected by 's'
* } finally {
* s.release();
* }
* }
* </pre>
*/
public abstract void release();
public abstract int getNumberOfPermitsUsed();
}
接口的定义很简单,上层调用会先tryAcquire获得信号量,然后通过release方式释放信号量。
- 实现类
实现类有两种。TryableSemaphoreActual和TryableSemaphoreNoOp。
- TryableSemaphoreActual
信号量的实现类。 - TryableSemaphoreNoOp
这个实现类相当于不限制,线程池模式的实现类采用的方式就是这种。
/* package */static class TryableSemaphoreActual implements TryableSemaphore {
protected final HystrixProperty<Integer> numberOfPermits;
private final AtomicInteger count = new AtomicInteger(0);
public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
this.numberOfPermits = numberOfPermits;
}
@Override
public boolean tryAcquire() {
int currentCount = count.incrementAndGet();
if (currentCount > numberOfPermits.get()) {
count.decrementAndGet();
return false;
} else {
return true;
}
}
@Override
public void release() {
count.decrementAndGet();
}
@Override
public int getNumberOfPermitsUsed() {
return count.get();
}
}
这个实现类就不多加解释了,主要是用了AtomicInteger做加减的操作。
3.调用层
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
...
/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
-- 释放信号量的操作
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
-- 获取信号量
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
三、总结
在实际的工作中对信号量和线程池的选择还是要慎重,事实上对于一般场景的单请求单线程的业务,信号量是能够满足需求的。
对于需要异步(command并行执行)的业务需要使用线程池的方式来实现。