Spring Cloud Hystrix 分析(三)之Hystr
前一节我们讲到Hystrix线程隔离和信号量隔离策略,以及收集命令执行过程中各个环节的信息,本节我们接着上一节遗留的部分,着重分析执行命令过程中命令超时之后处理以及Hystrix线程池初始化流程,加深我们对Hystrix的理解
AbstractCommand#HystrixObservableTimeoutOperator()
abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
......
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
//命令超时检查
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
//命令超时检查,超时则返回超时异常
private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
final AbstractCommand<R> originalCommand;
public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
this.originalCommand = originalCommand;
}
//此回调可以获取到当前Observable对象的上一级订阅者
//简单理解的话,可以直接理解为这个就是原始订阅者
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
//可以装多个Subscription 对象的集合,用于管理多个Subscription对象的取消订阅
final CompositeSubscription s = new CompositeSubscription();
// 将CompositeSubscription加入原始订阅者,对象如果原始订阅者取消订阅了,那么我们也需要取消CompositeSubscription管理的所有Subscription对象
child.add(s);
//定义超时线程,HystrixContextRunnable在这个地方没有垮线程池,可以直接理解为Runnable
final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
@Override
public void run() {
//返回超时异常给原始订阅者
child.onError(new HystrixTimeoutException());
}
});
//定时器
TimerListener listener = new TimerListener() {
@Override
public void tick() {
//当原始命令没有执行,那么修改命令状态为超时状态
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
// 扩展接口,上报命令超时
originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
// 取消parent这个代理订阅者
s.unsubscribe();
//执行超时回调,通知原始订阅者
timeoutRunnable.run();
}
}
//超时时间,默认为1秒
@Override
public int getIntervalTimeInMilliseconds() {
return originalCommand.properties.executionTimeoutInMilliseconds().get();
}
};
//将定时器加入全局定时器管理类
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
// 设置给原始命令用于在命令执行完毕进行清除操作
originalCommand.timeoutTimer.set(tl);
//可以理解为一个代理订阅者,当命令没超时,代理订阅者才会通知原始订阅者并且清除tl
Subscriber<R> parent = new Subscriber<R>() {
@Override
public void onCompleted() {
if (isNotTimedOut()) {
tl.clear();
child.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (isNotTimedOut()) {
tl.clear();
child.onError(e);
}
}
@Override
public void onNext(R v) {
if (isNotTimedOut()) {
child.onNext(v);
}
}
//判断命令是否超时,如果命令没执行过,那么会由NOT_EXECUTED更改为COMPLETED,
//如果还有命令执行,那么当执行onNext时候,那么当前状态为COMPLETED
private boolean isNotTimedOut() {
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.onNext);
}
};
//将代理订阅者加入CompositeSubscription对象中,让CompositeSubscription来管理是否取消订阅
s.add(parent);
//返回代理订阅者
return parent;
}
}
}
这里详细解释下execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));当执行了这个我们得到了一个返回值Observable,我们假设为ObservableD,那么当前我们的调用链就为
ObservableA->ObservableB->ObservableC->ObservableD,那么代码注释中:获取到当前Observable对象的上一级订阅者,这个简单理解就是我们会得到ObservableC的订阅者,以此类推,如果还是不明白,那么可以理解为只有一个ObservableA的时候,这个地方获取到的就是最原始的那个订阅者。
这里在解释下lift,lift内部会经过类似代理步骤,会调用HystrixObservableTimeoutOperator#call() 然后在调用executeCommandWithSpecifiedIsolation#Observable.defer#call(),具体感兴趣的同学可以分析下RxJava的lift用法
上面我们讲到创建定时器之后会将定时器加入全局定时器管理类中并开始执行定时任务,那么接着往下看
HystrixTimer
public class HystrixTimer {
......
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
Runnable r = new Runnable() {
@Override
public void run() {
try {
//回调超时
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
//定时任务,默认间隔时间为1秒
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
//返回一个软引用对象,在上层执行引用对象clear时一并取消ScheduledFuture任务
return new TimerReference(listener, f);
}
//初始化ScheduledThreadPoolExecutor定时任务线程池,线程池CoreSize为Runtime.getRuntime().availableProcessors()
protected void startThreadIfNeeded() {
while (executor.get() == null || ! executor.get().isInitialized()) {
if (executor.compareAndSet(null, new ScheduledExecutor())) {
executor.get().initialize();
}
}
}
......
}
定时器实现类我们也比较清晰得知,其实就是开启了一个定时任务线程池,默认间隔时间为1秒,当定时任务被触发之后会回调定时器的回调逻辑,用此来告知订阅者超时异常
HystrixThreadPool
public interface HystrixThreadPool {
......
static class Factory {
final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
//默认为groupKey(如果使用了@HystrixCommand注解,则默认值为@HystrixCommand的类名)
String key = threadPoolKey.name();
//有缓存则使用缓存
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// 初始化HystrixThreadPool(实现类为HystrixThreadPoolDefault)
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
}
//关闭所有线程池
static synchronized void shutdown() {
for (HystrixThreadPool pool : threadPools.values()) {
pool.getExecutor().shutdown();
}
threadPools.clear();
}
......
}
static class HystrixThreadPoolDefault implements HystrixThreadPool {
......
//初始化线程池相关的参数,ThreadPoolExecutor、HystrixThreadPoolMetrics
public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
......
}
//HystrixContextScheduler#ThreadPoolWorker#schedule中会获取当前线程池并将命令提交到线程池执行
@Override
public ThreadPoolExecutor getExecutor() {
touchConfig();
return threadPool;
}
@Override
public Scheduler getScheduler() {......}
//在AbstractCommand#executeCommandWithSpecifiedIsolation中线程隔离模式会使用当前的Scheduler
//.subscribeOn(threadPool.getScheduler())触发
@Override
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {
touchConfig();
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
// 设置线程池参数
private void touchConfig() {
......
threadPool.setCorePoolSize(dynamicCoreSize);
threadPool.setMaximumPoolSize(dynamicMaximumSize);
threadPool.setKeepAliveTime(properties.keepAliveTimeMinutes().get(), TimeUnit.MINUTES);
}
//以下三个均为记录当前HystrixThreadPoolMetrics#concurrentExecutionCount对象
//记录当前HystrixThreadPool对象运行的命令条数相关的信息,统计作用
@Override
public void markThreadExecution() {......}
@Override
public void markThreadCompletion() {......}
@Override
public void markThreadRejection() {......}
......
}
}
至此线程池的初始化工作完成了,在隔离模式为线程隔离时候,我们也得知Hystrix会通过调用subscribeOn来指定当前命令的调度工作,从而实现了命令执行在Hystrix指定的线程池中,那么下面我们就来简单看看命令是怎么被发送到Hystrix指定的线程池的
HystrixContextScheduler
public class HystrixContextScheduler extends Scheduler {
......
@Override
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
private class HystrixContextSchedulerWorker extends Worker {
......
@Override
public Subscription schedule(Action0 action) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
}
private static class ThreadPoolScheduler extends Scheduler {
......
@Override
public Worker createWorker() {
return new ThreadPoolWorker(threadPool, shouldInterruptThread);
}
}
private static class ThreadPoolWorker extends Worker {
......
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
//还没有订阅的逻辑,或者是已经取消订阅的逻辑
return Subscriptions.unsubscribed();
}
//创建一个具有Runable、Subscription接口对象
ScheduledAction sa = new ScheduledAction(action);
//加入到CompositeSubscription对象中便于统一处理取消订阅逻辑
subscription.add(sa);
sa.addParent(subscription);
//获取线程池
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
//将命令放入线程池执行,这里就实现了线程隔离策略
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
......
}
}
看到这里我们就已经知道了当使用Hystrix的线程隔离模式时候,最终通过这里就会把命令放入到线程池中执行,如果想更清楚的了解整个调用链,那么感兴趣的同学可以结合subscribeOn来分析,subscribeOn内部就会来调用Scheduler调度器来创建不同的Worker执行器,然后通过Worker执行器来调用schedule将命令传入线程池,线程相关的咱们已经基本总结完毕,下一节我们就开始总结数据采集之后是怎么来进行统计的!