Spring Cloudspring cloud

Spring Cloud Hystrix 分析(三)之Hystr

2021-02-18  本文已影响0人  Blog

上一节我们说到applyHystrixSemantics逻辑内部包含实现线程池隔离、信号量隔离,以及断路器健康状况信息上报,本节我们就重点分析是如何实现隔离、断路/熔断、信息上报!


AbstractCommand#toObservable()

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    public Observable<R> toObservable() {
        ......
        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                return applyHystrixSemantics(_cmd);
            }
        };
        ......
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                ......
                //创建具有断路器、隔离逻辑的Observable
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);
                Observable<R> afterCache;

                if (requestCacheEnabled && cacheKey != null) {
                    ......
                } else {
                    afterCache = hystrixObservable;
                }
                return afterCache
                        .doOnTerminate(terminateCommandCleanup)     
                        .doOnUnsubscribe(unsubscribeCommandCleanup) 
                        .doOnCompleted(fireOnCompletedHook);
            }
        }
    }
}

通过上面得知实现类为AbstractCommand#applyHystrixSemantics()

AbstractCommand#applyHystrixSemantics()

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    ......
    //断路器、隔离逻辑的Observable
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        //扩展接口,扩展给外部使用,内部无其他逻辑
        executionHook.onStart(_cmd);

        //断路器收集的信息是否允许请求,是否需要断路/熔断
        if (circuitBreaker.allowRequest()) {
            //获取执行的信号量,默认每个commandKey(调用的方法名为默认的commandKey)对应的最大信号量为10
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            //信号量释放标志
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            //释放信号量的逻辑
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };
            //标志异常的逻辑
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
            //尝试获取信号量
            if (executionSemaphore.tryAcquire()) {
                try {
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    //执行命令
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                //获取信号量失败的逻辑
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            //直接执行断路逻辑
            return handleShortCircuitViaFallback();
        }
    }
}

final TryableSemaphore executionSemaphore = getExecutionSemaphore();获取执行的信号量是建立在我们配置的模式为信号量隔离模式,信号量隔离模式默认使用TryableSemaphoreActual,如果是线程隔离则默认使用TryableSemaphoreNoOp,继续分析executeCommandAndObserve,看看内部如何处理的


AbstractCommand#executeCommandAndObserve()

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        //当前请求上下文
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
        //每次发射数据之前执行的逻辑(onNext发生时),记录当前发射的数据状态,以及将当前发射的数据发送给扩展接口提供给外部使用
        final Action1<R> markEmits = new Action1<R>() {
            @Override
            public void call(R r) {
                if (shouldOutputOnNextEvents()) {
                    executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                    eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
                }
                //GenericCommand命令会执行以下逻辑,收集数据,重置断路器状态
                if (commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    circuitBreaker.markSuccess();
                }
            }
        };
        //命令完成时的逻辑,记录当前发射的数据状态,以及将当前发射的数据发送给扩展接口提供给外部使用
        //主要针对HystrixObservableCommand类型的命令,而GenericCommand为HystrixCommand类型
        final Action0 markOnCompleted = new Action0() {
            @Override
            public void call() {
                if (!commandIsScalar()) {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                    eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                    circuitBreaker.markSuccess();
                }
            }
        };
        //异常情况的逻辑,同样也是收集当前异常数据(超时异常、拒绝执行异常...),以及将异常信息通过扩展接口提供给外部使用
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                //当前异常不会触发断路,不会采集当前异常,应用主动外抛的异常可以继承这个
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }
                    return handleFailureViaFallback(e);
                }
            }
        };
        //设置当前请求上下文,主要是针对Hystrix使用线程池隔离模式,有些应用可能需要进行垮线程池传递数据
        //请求先进入Tomcat容器线程池->Hystrix的线程池
        //其中就需要当前这个请求的参数从容器线程池的一个线程传递到Hystrix线程池的一个线程中
        final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
            @Override
            public void call(Notification<? super R> rNotification) {
                setRequestContextIfNeeded(currentRequestContext);
            }
        };

        //根据隔离策略处理逻辑
        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    //内部会维护一个定时任务,一秒执行一次判断命令是否超时,超时则返回超时异常
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }
}

以上的代码片段中我们可能需要关注下HystrixRequestContext,这个是Hystrix实现线程池之间数据传递比较重要的上下文,各个线程池内部维护各自的HystrixRequestContext,通过获取不同HystrixRequestContext来达到数据传递,一般结合HystrixContextRunnable、HystrixContextCallable、HystrixRequestVariableDefault使用


AbstractCommand#executeCommandWithSpecifiedIsolation()

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    //信号量与线程池默认值都是10,当10个线程或者是信号量都被占用,那么将会拒绝执行命令
    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        //线程池隔离模式
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    //类似状态机,判断当前命令是否已经执行过
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }
                    //上报命令统计指标信息,HystrixMetricsPoller定期处理这些上报的数据(定时任务时间间隔2秒)
                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
                    //判断命令是否超时,HystrixObservableTimeoutOperator会检查当前命令是否超时
                    if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                        return Observable.error(new RuntimeException("timed out before executing run()"));
                    }
                    //改变线程状态
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                        HystrixCounters.incrementGlobalConcurrentThreads();
                        //上报线程池统计指标信息
                        threadPool.markThreadExecution();
                        //记录正在运行的命令,将当前命令push到ThreadLocal修饰的堆栈中,当命令执行完毕之后在从堆栈中pop剔除
                        endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                        executionResult = executionResult.setExecutedInThread();
                        //内部无逻辑,扩展接口提供给外部使用
                        try {
                            executionHook.onThreadStart(_cmd);
                            executionHook.onRunStart(_cmd);
                            executionHook.onExecutionStart(_cmd);
                            //执行用户任务
                            return getUserExecutionObservable(_cmd);
                        } catch (Throwable ex) {
                            return Observable.error(ex);
                        }
                    } else {
                        //命令已取消订阅,返回错误
                        return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                    }
                }
            }).doOnTerminate(new Action0() {
                //onCompleted或者onError之前调用,上报线程相关的统计信息
                @Override
                public void call() {
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                        handleThreadEnd(_cmd);
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                        
                    }
                }
            }).doOnUnsubscribe(new Action0() {
                //取消订阅时调用,上报线程相关的统计信息
                @Override
                public void call() {
                    if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                        handleThreadEnd(_cmd);
                    }
                    if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                        
                    }
                }
            //重点,通过subscribeOn改变了订阅的线程,即call()执行的线程,将call()执行的线程放到了新的threadPool中执行
            //threadPool默认实现为HystrixThreadPoolDefault
            }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
                @Override
                public Boolean call() {
                    //判断命令是否超时了,没超时的命令才会执行
                    return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
                }
            }));
        } else {
            //信号量隔离
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    //类似状态机,判断当前命令是否已经执行过
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }
                    //上报命令相关的统计信息
                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                    //记录正在运行的命令,将当前命令push到ThreadLocal修饰的堆栈中,当命令执行完毕之后在从堆栈中pop剔除
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    try {
                        //扩展接口提供给外部使用
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        //执行用户任务
                        return getUserExecutionObservable(_cmd);  
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                }
            });
        }
    }
}

从上面注释的信息中我们就可以看出,通过上报统计指标信息来判断当前断路器的健康状况,命令执行的情况(成功、失败、超时、异常等)都会上报给HystrixCommandMetrics、HystrixThreadPoolMetrics这些继承了HystrixMetrics的类,最后会在HystrixMetricsPoller中对这些收集的信息与设定的条件来判断是否需要开启断路器/熔断,自此我们大致总结了线程池隔离与信号量隔离的方式

上一篇 下一篇

猜你喜欢

热点阅读