Spring Cloud Hystrix 分析(三)之Hystr
上一节我们说到applyHystrixSemantics逻辑内部包含实现线程池隔离、信号量隔离,以及断路器健康状况信息上报,本节我们就重点分析是如何实现隔离、断路/熔断、信息上报!
AbstractCommand#toObservable()
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
public Observable<R> toObservable() {
......
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
......
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
......
//创建具有断路器、隔离逻辑的Observable
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
if (requestCacheEnabled && cacheKey != null) {
......
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup)
.doOnUnsubscribe(unsubscribeCommandCleanup)
.doOnCompleted(fireOnCompletedHook);
}
}
}
}
通过上面得知实现类为AbstractCommand#applyHystrixSemantics()
AbstractCommand#applyHystrixSemantics()
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
......
//断路器、隔离逻辑的Observable
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
//扩展接口,扩展给外部使用,内部无其他逻辑
executionHook.onStart(_cmd);
//断路器收集的信息是否允许请求,是否需要断路/熔断
if (circuitBreaker.allowRequest()) {
//获取执行的信号量,默认每个commandKey(调用的方法名为默认的commandKey)对应的最大信号量为10
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
//信号量释放标志
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
//释放信号量的逻辑
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
//标志异常的逻辑
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//尝试获取信号量
if (executionSemaphore.tryAcquire()) {
try {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//执行命令
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//获取信号量失败的逻辑
return handleSemaphoreRejectionViaFallback();
}
} else {
//直接执行断路逻辑
return handleShortCircuitViaFallback();
}
}
}
final TryableSemaphore executionSemaphore = getExecutionSemaphore();获取执行的信号量是建立在我们配置的模式为信号量隔离模式,信号量隔离模式默认使用TryableSemaphoreActual,如果是线程隔离则默认使用TryableSemaphoreNoOp,继续分析executeCommandAndObserve,看看内部如何处理的
AbstractCommand#executeCommandAndObserve()
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
//当前请求上下文
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
//每次发射数据之前执行的逻辑(onNext发生时),记录当前发射的数据状态,以及将当前发射的数据发送给扩展接口提供给外部使用
final Action1<R> markEmits = new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEvent(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
}
//GenericCommand命令会执行以下逻辑,收集数据,重置断路器状态
if (commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
};
//命令完成时的逻辑,记录当前发射的数据状态,以及将当前发射的数据发送给扩展接口提供给外部使用
//主要针对HystrixObservableCommand类型的命令,而GenericCommand为HystrixCommand类型
final Action0 markOnCompleted = new Action0() {
@Override
public void call() {
if (!commandIsScalar()) {
long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
}
}
};
//异常情况的逻辑,同样也是收集当前异常数据(超时异常、拒绝执行异常...),以及将异常信息通过扩展接口提供给外部使用
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
//当前异常不会触发断路,不会采集当前异常,应用主动外抛的异常可以继承这个
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
//设置当前请求上下文,主要是针对Hystrix使用线程池隔离模式,有些应用可能需要进行垮线程池传递数据
//请求先进入Tomcat容器线程池->Hystrix的线程池
//其中就需要当前这个请求的参数从容器线程池的一个线程传递到Hystrix线程池的一个线程中
final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
@Override
public void call(Notification<? super R> rNotification) {
setRequestContextIfNeeded(currentRequestContext);
}
};
//根据隔离策略处理逻辑
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
//内部会维护一个定时任务,一秒执行一次判断命令是否超时,超时则返回超时异常
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
}
以上的代码片段中我们可能需要关注下HystrixRequestContext,这个是Hystrix实现线程池之间数据传递比较重要的上下文,各个线程池内部维护各自的HystrixRequestContext,通过获取不同HystrixRequestContext来达到数据传递,一般结合HystrixContextRunnable、HystrixContextCallable、HystrixRequestVariableDefault使用
AbstractCommand#executeCommandWithSpecifiedIsolation()
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
//信号量与线程池默认值都是10,当10个线程或者是信号量都被占用,那么将会拒绝执行命令
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
//线程池隔离模式
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
//类似状态机,判断当前命令是否已经执行过
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
//上报命令统计指标信息,HystrixMetricsPoller定期处理这些上报的数据(定时任务时间间隔2秒)
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
//判断命令是否超时,HystrixObservableTimeoutOperator会检查当前命令是否超时
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
return Observable.error(new RuntimeException("timed out before executing run()"));
}
//改变线程状态
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
HystrixCounters.incrementGlobalConcurrentThreads();
//上报线程池统计指标信息
threadPool.markThreadExecution();
//记录正在运行的命令,将当前命令push到ThreadLocal修饰的堆栈中,当命令执行完毕之后在从堆栈中pop剔除
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
//内部无逻辑,扩展接口提供给外部使用
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
//执行用户任务
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//命令已取消订阅,返回错误
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() {
//onCompleted或者onError之前调用,上报线程相关的统计信息
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
}
}
}).doOnUnsubscribe(new Action0() {
//取消订阅时调用,上报线程相关的统计信息
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
}
}
//重点,通过subscribeOn改变了订阅的线程,即call()执行的线程,将call()执行的线程放到了新的threadPool中执行
//threadPool默认实现为HystrixThreadPoolDefault
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
//判断命令是否超时了,没超时的命令才会执行
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
//信号量隔离
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
//类似状态机,判断当前命令是否已经执行过
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
//上报命令相关的统计信息
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
//记录正在运行的命令,将当前命令push到ThreadLocal修饰的堆栈中,当命令执行完毕之后在从堆栈中pop剔除
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
//扩展接口提供给外部使用
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
//执行用户任务
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
}
}
从上面注释的信息中我们就可以看出,通过上报统计指标信息来判断当前断路器的健康状况,命令执行的情况(成功、失败、超时、异常等)都会上报给HystrixCommandMetrics、HystrixThreadPoolMetrics这些继承了HystrixMetrics的类,最后会在HystrixMetricsPoller中对这些收集的信息与设定的条件来判断是否需要开启断路器/熔断,自此我们大致总结了线程池隔离与信号量隔离的方式