Spring CloudSpringCloud

Spring Cloud Hystrix 分析(一)之流程图

2021-02-08  本文已影响0人  Blog

随着业务的不断扩展,微服务比例逐渐增大,为了保证各个服务的稳定性以及服务之间的隔离,所以我们需要服务具备熔断、降级等功能,保证整个系统不会被拉垮,所以本节我们分析Hystrix熔断、降级,当然阿里的Sentinel、Spring官方推荐的Resilience4j也都具备这样的功能,但是本节重点分析Hystrix,虽然Hystrix已经停止更新了,但是现阶段许多微服务的熔断还是基于的Hystrix,那么现在我们开始进入正题!


Hystrix官方流程图

开局一张图,内容全靠懵!话有点多,我们直接上图,有点懵是吧?PS:正所谓,懵逼树上懵逼果,懵逼树下你和我,这就让笔者进行一波懵逼总结!因为Hystrix基于RxJava实现,如果对RxJava的被观察者和观察者不熟悉,那么正在阅读的同学可以先缓一缓,先去阅读下RxJava相关的用法,对RxJava的运作模式清晰之后,再来分析Hystrix就比较容易上手了,从官方Wiki文档上对流程的总结,我们能得知流程图为9部分组成,下文我们就看看每个步骤都做了什么工作!


步骤1、2(构建命令、执行命令)

命令分为HystrixCommand、HystrixObservableCommand两种,通过构建和使用两种命令,既可以使用Hystrix的熔断、降级等功能,两者用法基本一致,唯一有区别的就是HystrixCommand返回单个操作结果(发射一次数据),HystrixObservableCommand返回多个操作结果(发射多次数据),这样的区别其实只是在自定义实现HystrixCommand、HystrixObservableCommand时候才会有区别,对于Hystrix内部其实都是返回的Observable对象来获取结果

public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
    ......
    //当调用execute()、queue()方法时候需要实现此方法
    protected abstract R run() throws Exception;
    //同步执行,返回结果
    public R execute() {
        try {
            return queue().get();
        } catch (Exception e) {
            throw Exceptions.sneakyThrow(decomposeException(e));
        }
    }
    //异步执行,返回Future对象
    public Future<R> queue() {......}
}
public abstract class HystrixObservableCommand<R> extends AbstractCommand<R> implements HystrixObservable<R>, HystrixInvokableInfo<R> {
    ......
    //当调用observe()、toObservable()方法时候需要实现此方法
    protected abstract Observable<R> construct();
    //调用后自动订阅,立即执行命令
    public Observable<R> observe() {
        ReplaySubject<R> subject = ReplaySubject.create();
        final Subscription sourceSubscription = toObservable().subscribe(subject);
        return subject.doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                sourceSubscription.unsubscribe();
            }
        });
    }
    //调用后不会自动订阅,当调用方手动订阅之后,才执行命令
    public Observable<R> toObservable() {......}
}

步骤3(缓存)

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    public Observable<R> toObservable() {
        ......
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();

                //首先从缓存中获取是否存在相同命令的结果
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }
                //创建Observable
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                //放入缓存
                if (requestCacheEnabled && cacheKey != null) {
                    // 从缓存中包装Observable
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    if (fromCache != null) {
                        // 如果其他线程已经存入这个缓存命令,那么直接执行
                        toCache.unsubscribe();
                        isResponseFromCache = true;
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    } else {
                        // 返回刚刚我们创建的ObservableCommand
                        afterCache = toCache.toObservable();
                    }
                } else {
                    afterCache = hystrixObservable;
                }

                return afterCache
                        //命令执行结束后的清理动作
                        .doOnTerminate(terminateCommandCleanup) 
                        //取消订阅后的清理动作
                        .doOnUnsubscribe(unsubscribeCommandCleanup) 
                        //命令执行完成后的Hook动作
                        .doOnCompleted(fireOnCompletedHook);
            }
        }
    }
}

从代码片段中我们能大致得知如果开启了缓存,那么优先从缓存中查找,是否存在相同的命令,如果存在相同的命令,那么直接从这个缓存命令返回结果,否则每次都创建新的命令并执行,返回最终的结果


步骤4(断路器/熔断)

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    ......
    public Observable<R> toObservable() {
        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                //如果没有订阅,则中断当前事件流,never()不会执行订阅者的onCompleted
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                //Hystrix断路器、隔离相关逻辑
                return applyHystrixSemantics(_cmd);
            }
        };
    }
    ......
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        //扩展接口,提供给使用者扩展使用
        executionHook.onStart(_cmd);

        //断路器是否允许请求
        if (circuitBreaker.allowRequest()) {
            //获取执行的信号量
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            //信号量释放标志
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            //释放信号量的逻辑
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };
            //标志异常的逻辑
            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
            //尝试获取信号量
            if (executionSemaphore.tryAcquire()) {
                try {
                    //跟踪线程执行时间
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    //执行命令
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                //获取信号量失败的逻辑
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            //直接执行断路逻辑
            return handleShortCircuitViaFallback();
        }
    }
}

通过注释信息,我们能得知,执行命令时,首先判断断路器是否允许请求,然后获取执行命令的信号量,获取不到就返回失败,以及在执行完命令之后信号量释放操作


步骤5(信号量、线程池隔离策略)

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        ......
        //异常情况的逻辑
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {......}
        Observable<R> execution;
        //是否启用超时
        //executeCommandWithSpecifiedIsolation根据隔离策略处理逻辑
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback)
                .doOnEach(setRequestContext);
    }

    private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        //线程池隔离策略
        if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
          ......
        } else {
            //信号量隔离策略
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    executionResult = executionResult.setExecutionOccurred();
                    if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                        return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                    }
                    
                    metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                    //存储正在运行的命令
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    try {
                        //扩展接口
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        //执行用户任务
                        return getUserExecutionObservable(_cmd);  
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                }
            });
        }
    }
}

信号量与线程池默认值都是10,当10个线程或者是信号量都被占用,那么将会拒绝执行命令


步骤6(执行用户任务)

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    ......
    protected abstract Observable<R> getExecutionObservable();
    private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
        Observable<R> userObservable;

        try {
            userObservable = getExecutionObservable();
        } catch (Throwable ex) {
            userObservable = Observable.error(ex);
        }

        return userObservable
                .lift(new ExecutionHookApplication(_cmd))
                .lift(new DeprecatedOnRunHookApplication(_cmd));
    }
}

对于HystrixCommand命令模式的需要实现run()方法,对于HystrixObservableCommand命令模式的需要实现construct()方法,调用到用户真正的任务


步骤7(断路器健康状况)

通过上报统计指标信息来判断当前断路器的健康状况,命令执行的情况(成功、失败、超时、异常等)都会上报给HystrixCommandMetrics这个类,最终通过HystrixMetricsPoller定期处理这些上报的数据(定时任务时间间隔2秒),在判断这些汇总数据和设定的条件来判断是否需要开启断路器/熔断


步骤8(命令执行失败)

通过流程图得知,开启断路器/熔断、线程池或者信号量拒绝、执行失败、执行超时等都会执行用户指定的Fallback逻辑,HystrixCommand类型的命令执行getFallback(),HystrixObservableCommand类型的命令执行resumeWithFallback()


步骤9(返回命令执行结果)

结果最终以Observable对象返回,用户可以通过Observable对应的不同策略进行获取,execute()、queue()、observe()、toObservable()等

根据流程图我们大致可以看出Hystrix具体做了哪些工作,对于每个步骤具体对应的逻辑相对比较复杂,我们放到后续分析逐步揭晓!

上一篇 下一篇

猜你喜欢

热点阅读