Spring Cloudspring cloud

Spring Cloud Hystrix 分析(三)之Hystr

2021-02-24  本文已影响0人  Blog

我们在使用Hystrix的断路器/熔断功能时,如果细心的同学可能会观察到当我们短时间内重复触发某个异常接口或者时服务的时候,会直接触发Hystrix的断路器开关,Hystrix内部会直接抛出异常给调用方,而不再调用接口或者服务,本节我们就重点分析下Hystrix是如何通过收集的信息来触发这个断路器开关的!


HystrixMetricsPollerConfiguration

@Configuration
public class HystrixCircuitBreakerConfiguration {
    ......
    //SmartLifecycle 在spring Ioc容器创建之后、关闭之前执行的操作
    @Configuration
    @ConditionalOnProperty(value = "hystrix.metrics.enabled", matchIfMissing = true)
    @ConditionalOnClass({ HystrixMetricsPoller.class, GaugeService.class })
    @EnableConfigurationProperties(HystrixMetricsProperties.class)
    protected static class HystrixMetricsPollerConfiguration implements SmartLifecycle {
        //通过localhost:8080/metrics查看监控的基本指标
        //Spring Boot Actuator 统计服务
        @Autowired(required = false)
        private GaugeService gauges;
        @Autowired
        private HystrixMetricsProperties metricsProperties;
        //Jackson对象,用于获取数据
        private ObjectMapper mapper = new ObjectMapper();
        //指标轮训实现类,内部维护一个定时任务线程池
        private HystrixMetricsPoller poller;
        //保留的字段,这些字段不会处理
        private Set<String> reserved = new HashSet<String>(Arrays.asList("group", "name",
                "type", "currentTime"));

        @Override
        public void start() {
            if (this.gauges == null) {
                return;
            }
            //通过HystrixMetricsPoller回调回来的Hystrix统计信息
            //最终上报给Actuator服务,外部可以通过/metrics获取目前应用的指标信息
            MetricsAsJsonPollerListener listener = new MetricsAsJsonPollerListener() {
                @Override
                public void handleJsonMetric(String json) {
                    try {
                        @SuppressWarnings("unchecked")
                        Map<String, Object> map = HystrixMetricsPollerConfiguration.this.mapper
                                .readValue(json, Map.class);
                        if (map != null && map.containsKey("type")) {
                            addMetrics(map, "hystrix.");
                        }
                    }
                    catch (IOException ex) {}
                }
            };
            //Hystrix指标信息定时任务,间隔时间5秒
            this.poller = new HystrixMetricsPoller(listener,
                    metricsProperties.getPollingIntervalMs());
            //开启定时任务
            this.poller.start();
            logger.info("Starting poller");
        }
        //上报指标信息给Actuator服务
        private void addMetrics(Map<String, Object> map, String root) {
            StringBuilder prefixBuilder = new StringBuilder(root);
            if (map.containsKey("type")) {
                prefixBuilder.append((String) map.get("type"));
                if (map.containsKey("group")) {
                    prefixBuilder.append(".").append(map.get("group"));
                }
                prefixBuilder.append(".").append(map.get("name"));
            }
            String prefix = prefixBuilder.toString();
            for (String key : map.keySet()) {
                Object value = map.get(key);
                if (!this.reserved.contains(key)) {
                    if (value instanceof Number) {
                        String name = prefix + "." + key;
                        this.gauges.submit(name, ((Number) value).doubleValue());
                    }
                    else if (value instanceof Map) {
                        @SuppressWarnings("unchecked")
                        Map<String, Object> sub = (Map<String, Object>) value;
                        addMetrics(sub, prefix);
                    }
                }
            }
        }
        @Override
        public void stop() {
            if (this.poller != null) {
                this.poller.shutdown();
            }
        }
        @Override
        public boolean isRunning() {
            return this.poller != null ? this.poller.isRunning() : false;
        }
        //返回值小的start()方法先调用,stop()方法相反
        @Override
        public int getPhase() {
            return Ordered.LOWEST_PRECEDENCE;
        }
        @Override
        public boolean isAutoStartup() {
            return true;
        }
        @Override
        public void stop(Runnable callback) {
            if (this.poller != null) {
                this.poller.shutdown();
            }
            callback.run();
        }
    }
}

上面这个配置文件,其实就做了两件事起

  1. 初始化Hystrix指标信息收集定时任务并开启
  2. 将Hystrix收集的指标信息提交给Spring Boot的Actuator服务(metrics监控)

HystrixMetricsPoller

public class HystrixMetricsPoller {
    ......
    //初始化定时任务线程池
    public HystrixMetricsPoller(MetricsAsJsonPollerListener listener, int delay) {
        this.listener = listener;
        ThreadFactory threadFactory = null;
        if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
            threadFactory = new MetricsPollerThreadFactory();
        } else {
            threadFactory = PlatformSpecific.getAppEngineThreadFactory();
        }
        executor = new ScheduledThreadPoolExecutor(1, threadFactory);
        this.delay = delay;
    }

    public synchronized void start() {
        // 对比并设置running标志
        if (running.compareAndSet(false, true)) {
            logger.debug("Starting HystrixMetricsPoller");
            try {
                //间隔时间为5秒的定时任务
                scheduledTask = executor.scheduleWithFixedDelay(new MetricsPoller(listener), 0, delay, TimeUnit.MILLISECONDS);
            } catch (Throwable ex) {
                logger.error("Exception while creating the MetricsPoller task");
                ex.printStackTrace();
                running.set(false);
            }
        }
    }
    ......
    private class MetricsPoller implements Runnable {
        private final MetricsAsJsonPollerListener listener;
        private final JsonFactory jsonFactory = new JsonFactory();
        public MetricsPoller(MetricsAsJsonPollerListener listener) {
            this.listener = listener;
        }

        @Override
        public void run() {
            try {
                //上报HystrixCommandMetrics类型的统计信息
                for (HystrixCommandMetrics commandMetrics : HystrixCommandMetrics.getInstances()) {
                    String jsonString = getCommandJson(commandMetrics);
                    listener.handleJsonMetric(jsonString);
                }
                //上报HystrixThreadPoolMetrics线程执行类的统计信息
                for (HystrixThreadPoolMetrics threadPoolMetrics : HystrixThreadPoolMetrics.getInstances()) {
                    if (hasExecutedCommandsOnThread(threadPoolMetrics)) {
                        String jsonString = getThreadPoolJson(threadPoolMetrics);
                        listener.handleJsonMetric(jsonString);
                    }
                }
                //上报HystrixCollapserMetrics请求合并类的统计信息
                for (HystrixCollapserMetrics collapserMetrics : HystrixCollapserMetrics.getInstances()) {
                    String jsonString = getCollapserJson(collapserMetrics);
                    listener.handleJsonMetric(jsonString);
                }
            } catch (Exception e) {
                logger.warn("Failed to output metrics as JSON", e);
                pause();
                return;
            }
        }
        //封装HystrixCommandKey对应的指标信息Json数据
        private String getCommandJson(final HystrixCommandMetrics commandMetrics) throws IOException {
            HystrixCommandKey key = commandMetrics.getCommandKey();
            HystrixCircuitBreaker circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(key);
            StringWriter jsonString = new StringWriter();
            JsonGenerator json = jsonFactory.createGenerator(jsonString);
            json.writeStartObject();
            json.writeStringField("type", "HystrixCommand");
            json.writeStringField("name", key.name());
            json.writeStringField("group", commandMetrics.getCommandGroup().name());
            json.writeNumberField("currentTime", System.currentTimeMillis());
            ......
            json.writeEndObject();
            json.close();
            return jsonString.getBuffer().toString();
        }
        ......
    }
    ......
}

从HystrixMetricsPoller类里面我们其实很清晰的得知,就做了一件事情,获取HystrixCommandMetrics、HystrixThreadPoolMetrics、HystrixCollapserMetrics这3种类型的指标统计信息,然后封装成Json数据回调给Spring Boot Actuator的GaugeService统计服务,最终提供给/metrics路径使用,那下面我们继续在分析下Hystrix断路器开关阀值是如何统计和计算的!


AbstractCommand#applyHystrixSemantics()

abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        ......
        //断路器判断是否断路
        if (circuitBreaker.allowRequest()) {......}
        ......
    }
}

断路器开关是否打开的默认实现类为HystrixCircuitBreakerImpl,那我们继续跟踪看看内部是如何判断断路的

HystrixCircuitBreaker#HystrixCircuitBreakerImpl#allowRequest()

public interface HystrixCircuitBreaker {
   ......
   static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
        //断路器/熔断打开状态,默认关闭状态
        private AtomicBoolean circuitOpen = new AtomicBoolean(false);
        //断路器打开的时间或者是滑动窗口之后的时间,isOpen()、 allowSingleTest()有解释
        private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
        //重置断路器状态,重置统计数据
        public void markSuccess() {
            if (circuitOpen.get()) {
                if (circuitOpen.compareAndSet(true, false)) {
                    metrics.resetStream();
                }
            }
        }
        @Override
        public boolean allowRequest() {
            //强制打开了断路器/熔断
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            // 强制关闭了断路器/熔断
            if (properties.circuitBreakerForceClosed().get()) {
                isOpen();
                return true;
            }
            return !isOpen() || allowSingleTest();
        }
        //是否允许测试性的请求
        public boolean allowSingleTest() {
            long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
            // 断路器状态为打开状态,
            //并且滑动窗口时间(5秒)加断路器状态变更时间小于当前时间,则说明可以测试性的请求
            if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
                //更新当前断路器状态变更时间
                if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
                    return true;
                }
            }
            return false;
        }
        //断路器状态是否为打开状态
        @Override
        public boolean isOpen() {
            //获取断路器状态
            if (circuitOpen.get()) {
                return true;
            }
            //获取HealthCounts
            HealthCounts health = metrics.getHealthCounts();
            // 判断总请求数是否小于20,如果小于20,则不需要开启断路/熔断
            if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                return false;
            }
            //判断请求失败率是否小于百分之50,如果小于百分之50的错误率,那么不需要开启断路/熔断
            if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                return false;
            } else {
                // 错误率大于阀值百分之50,则设置断路器状态为打开状态
                if (circuitOpen.compareAndSet(false, true)) {
                    // 更新断路器状态变化的时间
                    circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
                    return true;
                } else {
                    return true;
                }
            }
        }
   }
   ......
}

断路器状态判断围绕了总请求数量与失败率等条件进行计算,当请求数超过20,并且失败率大于百分之50,那么就会触发断路器/熔断开关,以及引入了滑动窗口5秒,在滑动窗口时间之后会尝试测试性请求,测试服务是否可用,如果不可用,那么继续等待下一个窗口时间,自此整个Hystrix比较重要的总结完毕!

上一篇下一篇

猜你喜欢

热点阅读