springboot

Hystrix系列之入门 篇

2018-10-12  本文已影响82人  JiMingQiang

一、推荐阅读

可能有部分同学对 Hystrix 的特性了解的不是很清晰,推荐如下文章,写的真的好;

二、如何使用Hystrix

目前Hystrix最新版本是1.5.13,在项目的pom文件中加上依赖

       <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-core</artifactId>
            <version>1.5.13</version>
        </dependency>

1.Command方式

要想使用hystrix,只需要继承HystrixCommandHystrixObservableCommand,两者主要区别是:

1.1执行命令

execute()queue()observe()toObservable()这4个方法用来触发执行run()/construct(),一个实例只能执行一次这4个方法,特别说明的是HystrixObservableCommand没有execute()queue()

4个方法的主要区别

1.2 HystrixCommand

public class CommandHelloWorld2 extends HystrixCommand<String> {
    private final String name;

    protected CommandHelloWorld2(String name) {
        super(//命令分组用于对依赖操作分组,便于统计,汇总等.
                Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
                        //配置依赖超时时间,500毫秒
                        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(500))
                        //HystrixCommondKey工厂定义依赖名称
                        .andCommandKey(HystrixCommandKey.Factory.asKey("commandHelloWorld2"))
                        //使用HystrixThreadPoolKey工厂定义线程池名称
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
        this.name = name;
    }

    @Override
    protected String getFallback() {
        return "execute Falled";
    }

    @Override
    protected String run() throws Exception {
        //sleep 2秒 ,调用会超时
       // TimeUnit.MILLISECONDS.sleep(2000);
        return "Hello " + name + " thread : " + Thread.currentThread().getName();
    }

    public static void main(String[] args) throws Exception {

        CommandHelloWorld2 commandHelloWorld2 = new CommandHelloWorld2("test-Fallback");
        String s = commandHelloWorld2.execute();
        System.out.println(" 同步 ====== " + s);
        /*Future<String> queue = commandHelloWorld2.queue();
        String s1 = queue.get();*/
    }
}

构造方法

主要是相关参数设置,具体的参数的作用后续会介绍

 protected CommandHelloWorld2(String name) {
        super(//命令分组用于对依赖操作分组,便于统计,汇总等.
                Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("HelloWorldGroup"))
                        //配置依赖超时时间,500毫秒
                        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(500))
                        //HystrixCommondKey工厂定义依赖名称
                        .andCommandKey(HystrixCommandKey.Factory.asKey("commandHelloWorld2"))
                        //使用HystrixThreadPoolKey工厂定义线程池名称
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
        this.name = name;
    }

以下是封装的具体点的参数设置,具体的参数设置根据业务需求而定;

    public CommandHelloWorld4(Integer id) {
        super(setter());
        this.id = id;
    }

    private static Setter setter() {
        return ApiSetter.setter("getNum");
    }
public class ApiSetter {
    public static HystrixCommand.Setter setter(String commandKeyName, String threadPoolKeyName) {
        return setter("ApiGroup",commandKeyName,threadPoolKeyName);
    }

    public static HystrixCommand.Setter setter(String commandKeyName) {
        return setter(commandKeyName,"Api-Pool");
    }

    /**
     * @author liweihan
     * @time 2017/12/20 16:57
     * @description     相关参数设置
     * @param groupKeyName      服务分组名
     * @param commandKeyName    服务标识名称
     * @param threadPoolKeyName 线程池名称
     * @return
     */
    public static HystrixCommand.Setter setter(String groupKeyName, String commandKeyName, String threadPoolKeyName) {
        //服务分组
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(groupKeyName);
        //服务标识
        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(commandKeyName);
        //线程池名称
        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey(threadPoolKeyName);
        //线程配置
        HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter()
                .withCoreSize(25)
                .withKeepAliveTimeMinutes(5)
                .withMaxQueueSize(Integer.MAX_VALUE)
                .withQueueSizeRejectionThreshold(10000);

        //命令属性的配置
        HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                .withExecutionIsolationThreadInterruptOnTimeout(true)
                .withExecutionTimeoutInMilliseconds(3000) //设置超时时间为3秒时自动熔断
                .withCircuitBreakerErrorThresholdPercentage(20);//失败率达到20%自动熔断

        //返回
        return HystrixCommand.Setter
                .withGroupKey(groupKey)
                .andCommandKey(commandKey)
                .andThreadPoolKey(threadPoolKey)
                .andThreadPoolPropertiesDefaults(threadPoolProperties)
                .andCommandPropertiesDefaults(commandProperties);
    }

   
}

1.3 HystrixObservableCommand

代码如下:

public class HelloWorldHystrixObservableCommand extends HystrixObservableCommand<String> {

    private final String name;

    public HelloWorldHystrixObservableCommand(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

//  @Override
//    protected String getFallback() {
//      System.out.println("触发了降级!");
//        return "exeucute fallback";
//    }

    @Override
    protected Observable<String> construct() {
        System.out.println("in construct! thread:" + Thread.currentThread().getName());
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> observer) {
                try {
                    System.out.println("in call of construct! thread:" + Thread.currentThread().getName());
                    if (!observer.isUnsubscribed()) {
//                      observer.onError(getExecutionException());  // 直接抛异常退出,不会往下执行
                        observer.onNext("Hello1" + " thread:" + Thread.currentThread().getName());
                        observer.onNext("Hello2" + " thread:" + Thread.currentThread().getName());
                        observer.onNext(name + " thread:" + Thread.currentThread().getName());
                        System.out.println("complete before------" + " thread:" + Thread.currentThread().getName());
                        observer.onCompleted(); // 不会往下执行observer的任何方法
                        System.out.println("complete after------" + " thread:" + Thread.currentThread().getName());
                        observer.onNext("abc"); // 不会执行到
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        } );
    }

    public static void main(String[] args) throws Exception{

        Observable<String> hotObservable = new HelloWorldHystrixObservableCommand("Hlx").observe();
        // Observable<String> hotObservable = new HelloWorldHystrixObservableCommand("Hlx").toObservable();
        Thread.sleep(2000);
        System.out.println("睡眠中。。。。。");
        // 注册观察者事件
        // subscribe()是非堵塞的
        hotObservable.subscribe(new Observer<String>() {

            // 先执行onNext再执行onCompleted
            @Override
            public void onCompleted() {
                System.out.println("hotObservable of ObservableCommand completed");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("hotObservable of ObservableCommand error");
                e.printStackTrace();
            }

            @Override
            public void onNext(String v) {
                System.out.println("hotObservable of ObservableCommand onNext: " + v);
            }

        });

    }
}

执行结果如下:

in construct! thread:main
in call of construct! thread:main
complete before------ thread:main
complete after------ thread:main
睡眠中。。。。。
hotObservable of ObservableCommand onNext: Hello1 thread:main
hotObservable of ObservableCommand onNext: Hello2 thread:main
hotObservable of ObservableCommand onNext: Hlx thread:main
hotObservable of ObservableCommand completed

toObservable()方法

将toObservable()的注释放开,observe()注释掉,执行main方法,结果如下:

睡眠中。。。。。
in construct! thread:main
in call of construct! thread:main
hotObservable of ObservableCommand onNext: Hello1 thread:main
hotObservable of ObservableCommand onNext: Hello2 thread:main
hotObservable of ObservableCommand onNext: Hlx thread:main
complete before------ thread:main
hotObservable of ObservableCommand completed
complete after------ thread:main

2.fallback(降级)

使用fallback机制很简单,继承HystrixCommand只需重写getFallback(),继承HystrixObservableCommand只需重写resumeWithFallback(),比如HelloWorldHystrixCommand加上下面代码片段:

    @Override
    protected String getFallback() {
        return "execute Falled";
    }

fallback实际流程是当run()/construct()被触发执行时或执行中发生错误时,将转向执行getFallback()/resumeWithFallback()。结合下图,4种错误情况将触发fallback:

若失败回退方法执行失败,或者用户未提供失败回退方法,Hystrix 会根据调用执行命令的方法的不同而产生不同的行为:

Hystrix.png

途中大致的执行顺序如下:

1、构建 HystrixCommand 或者 HystrixObservableCommand 对象
2、执行命令(即上述 Command 对象包装的逻辑)
3、结果是否有缓存
4、请求线路(类似电路)是否是开路
5、线程池/请求队列/信号量占满时会发生什么
6、使用 HystrixObservableCommand.construct() 还是 HystrixCommand.run()
7、计算链路健康度
8、失败回退逻辑
9、返回正常回应

接下来,我们一一验证4中情况

1、当construct()或者run()方法执行过程中抛出异常,代码如下:

public class HystrixFallbackException extends HystrixCommand<String> {
    private final String name;

    public HystrixFallbackException(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("FallbackGroup"));
        this.name = name;
    }

    @Override
    protected String run() throws Exception {

        /*---------------会触发fallback的case-------------------*/
        //1.主动抛出异常
//      throw new HystrixTimeoutException();
//      throw new RuntimeException("this command will trigger fallback");
//      throw new Exception("this command will trigger fallback");
//      throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, commandClass, message, cause, fallbackException);


        // 2.除零异常
        //int i = 1/0;

        /*---------------不会触发fallback的case-------------------*/
        // 3.被捕获的异常不会触发fallback
        /*try {
            throw new RuntimeException("this command never trigger fallback");
        } catch(Exception e) {
            e.printStackTrace();
        }*/

        // 4.HystrixBadRequestException异常由非法参数或非系统错误引起,不会触发fallback,也不会被计入熔断器
        throw new HystrixBadRequestException("HystrixBadRequestException is never trigger fallback");

        //return name;
    }

    @Override
    protected String getFallback() {
        return "fallback: " + name;
    }

    public static void main(String[] args) {
        HystrixFallbackException hlx = new HystrixFallbackException("Hlx");
        try {
            String execute = hlx.execute();
            System.out.println(execute);
        } catch (Exception e) {
            System.out.println("run()抛出HystrixBadRequestException时,会被捕获到这里" + e.getCause());
        }

    }
}

2、命令执行超时

代码如下:

public class HystrixFallbackTimeOut extends HystrixCommand<String> {
    private final String name;

    protected HystrixFallbackTimeOut(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    @Override
    protected String getFallback() {
        return "execute Falled";
    }

    @Override
    protected String run() throws Exception {
        //sleep 2秒 ,调用会超时
        TimeUnit.MILLISECONDS.sleep(2000);
        return "Hello " + name + " thread : " + Thread.currentThread().getName();
    }

    public static void main(String[] args) throws Exception {

        HystrixFallbackTimeOut hystrixFallbackTimeOut = new HystrixFallbackTimeOut("Fallback-timeout");
        String s = hystrixFallbackTimeOut.execute();
        System.out.println(" 同步 ====== " + s);


    }
}

3、当回路器打开,命令的执行进入了熔断状态

代码如下:

/**
 *
 * CircuitBreakerRequestVolumeThreshold设置为3,意味着10s内请求超过3次才会触发熔断器
 * circuitBreakerErrorThresholdPercentage设置为80,错误率是为%80才会触发熔断器
 * 必须两个参数同时满足才会才会触发熔断器
 * run()中使命令超时进入fallback,执行4次run后,将被熔断,进入降级,即不进入run()而直接进入fallback
 * 
 */
public class HystrixFallbackCircuitBreaker extends HystrixCommand<String> {
    private Integer id;

    public HystrixFallbackCircuitBreaker(Integer id) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CircuitBreakerTestGroup"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("CircuitBreakerTestKey"))
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("CircuitBreakerTest"))
                        .andThreadPoolPropertiesDefaults(   // 配置线程池
                                HystrixThreadPoolProperties.Setter()
                                        .withCoreSize(200)  // 配置线程池里的线程数,设置足够多线程,以防未熔断却打满threadpool
                        )
                        .andCommandPropertiesDefaults(  // 配置熔断器
                                HystrixCommandProperties.Setter()
                                        //开启熔断器
                                        .withCircuitBreakerEnabled(true)
                                        //滑动窗口内(10s)的请求数阈值,只有达到了这个阈值,才有可能熔断。默认是 20,如果这个时间段只有19个请求,就算全部失败了,也不会自动熔断。
                                        .withCircuitBreakerRequestVolumeThreshold(3)
                                        //错误率阈值,默认 50%,比如(10s)内有100个请求,其中有60个发生异常,那么这段时间的错误率是 60,已经超过了错误率阈值,熔断器会自动打开。
                                        .withCircuitBreakerErrorThresholdPercentage(80)
//                      .withCircuitBreakerForceOpen(true)  // 置为true时,所有请求都将被拒绝,直接到fallback
//                      .withCircuitBreakerForceClosed(true)    // 置为true时,将忽略错误
//                      .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)   // 信号量隔离
//                      .withExecutionTimeoutInMilliseconds(5000)
                        )
        );
        this.id = id;
    }

    @Override
    protected String run() throws Exception {
        System.out.println("running run():" + id);
        if (id % 2 == 0 && id <= 10) { //让小于等于10的偶数返回
            return "running run():" + id;
        } else { //让奇数或大于10的数进入fallback
            TimeUnit.MILLISECONDS.sleep(2000);
            return  id+"";
        }
    }

    @Override
    protected String getFallback() {
        return " ====== CircuitBreaker fallback" + id + " ,是否进入熔断:" + super.isCircuitBreakerOpen();
    }

    public static void main(String[] args) {
        for(int i = 0; i < 50; i++) {
            try {
                System.out.println("===========" + new HystrixFallbackCircuitBreaker(i).execute());
            } catch(Exception e) {
                System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
            }
        }
    }

4、当命令执行的线程池和队列或者信号量已经满容

代码如下:

package com.jimingqiang.study.hystrix.hystrixbegin.failback;

import com.netflix.hystrix.*;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * Created by QDHL on 2018/10/11.
 *
 * @author mingqiang ji
 */
public class HystrixFallbackThreadPool extends HystrixCommand<String> {
    private final String name;

    public HystrixFallbackThreadPool(String name) {
//        super(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"));
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolTest"))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(5000)
                )
                .andThreadPoolPropertiesDefaults(
                        HystrixThreadPoolProperties.Setter()
                                .withCoreSize(3)    // 配置线程池里的线程数
                )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {

        TimeUnit.MILLISECONDS.sleep(3000);
        return name;
    }

    @Override
    protected String getFallback() {
        return "fallback: " + name;
    }

    public static void main(String[] args) {
        for(int i = 0; i < 10; i++) {
            try {
                Future<String> future = new HystrixFallbackThreadPool("Hlx"+i).queue();
            } catch(Exception e) {
                System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
            }
        }
        for(int i = 0; i < 20; i++) {
            try {
                System.out.println("===========" + new HystrixFallbackThreadPool("Hlx").execute());
            } catch(Exception e) {
                System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
            }
        }
    }


}

线程池的使用示意图如下图所示,当n个请求线程并发对某个接口请求调用时,会先从hystrix管理的线程池里面获得一个线程,然后将参数传递给这个线程去执行真正调用。线程池的大小有限,默认是10个线程,可以使用maxConcurrentRequests参数配置,如果并发请求数多于线程池线程个数,就有线程需要进入队列排队,但排队队列也有上限,默认是 5,如果排队队列也满,则必定有请求线程会走fallback流程。

线程池模式可以支持异步调用,支持超时调用,支持直接熔断,存在线程切换,开销大。

线程池.png

代码如下:

public class HystrixThreadPoolLsolation extends HystrixCommand<String> {
    private final String name;

    public HystrixThreadPoolLsolation(String name,String threadPoolName) {
//        super(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"));
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(threadPoolName))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(5000)
                )
                .andThreadPoolPropertiesDefaults(
                        HystrixThreadPoolProperties.Setter()
                                .withCoreSize(3)  // 配置线程池里的线程数
                )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {

        TimeUnit.MILLISECONDS.sleep(3000);
        return name+"-"+Thread.currentThread().getName();
    }

    @Override
    protected String getFallback() {
        return "fallback: " + name;
    }

    public static void main(String[] args) {
        for(int i = 0; i < 10; i++) {
            try {
                Future<String> future = new HystrixThreadPoolLsolation("Hlx"+i,"thread-pool-1").queue();
            } catch(Exception e) {
                System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
            }
        }
        for(int i = 0; i < 20; i++) {
            try {
                System.out.println("===========" + new HystrixThreadPoolLsolation("Hlx","thread-pool-2").execute());
            } catch(Exception e) {
                System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
            }
        }
    }


}

3.2 信号量

如果要使用信号量模式,需要配置参数execution.isolation.strategy=ExecutionIsolationStrategy.SEMAPHORE

另外,为了限制对下游依赖的并发调用量,可以配置Hystrix的

execution.isolation.semaphore.maxConcurrentRequests

当并发请求数达到阈值时,请求线程可以快速失败,执行降级。

信号量的使用示意图如下图所示,当n个并发请求去调用一个目标服务接口时,都要获取一个信号量才能真正去调用目标服务接口,但信号量有限,默认是10个,可以使用maxConcurrentRequests参数配置,如果并发请求数多于信号量个数,信号量在达到上限时,会拒绝后续请求的访问,则必定有请求线程会走fallback流程,从而达到限流和防止雪崩的目的。

信号量.png

信号量模式从始至终都只有请求线程自身,是同步调用模式,不支持超时调用,由于没有线程的切换,开销非常小。

在该模式下,接收请求和执行下游依赖在同一个线程内完成,

比如一个接口中依赖了3个下游:serviceA、serviceB、serviceC,且这3个服务返回的数据互相不依赖,这种情况下如果针对A、B、C的熔断降级使用信号量模式,那么接口耗时就等于请求A、B、C服务耗时的总和,无疑这不是好的方案。

代码如下:

package com.jimingqiang.study.hystrix.hystrixbegin.Isolationstrategy;

import com.netflix.hystrix.*;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 测试信号量隔离
 * 默认执行run()用的是主线程,为了模拟并行执行command,这里我们自己创建多个线程来执行command
 * 设置ExecutionIsolationSemaphoreMaxConcurrentRequests为3,意味着信号量最多允许执行run的并发数为3,超过则触发降级,即不执行run而执行getFallback
 * 设置FallbackIsolationSemaphoreMaxConcurrentRequests为1,意味着信号量最多允许执行fallback的并发数为1,超过则抛异常fallback execution rejected
 */
public class HystrixSemaphorelLsolation extends HystrixCommand<String> {
    private final String name;

    public HystrixSemaphorelLsolation(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SemaphoreTestGroup"))
                        .andCommandKey(HystrixCommandKey.Factory.asKey("SemaphoreTestKey"))
                        .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("SemaphoreTestThreadPoolKey"))
                        .andCommandPropertiesDefaults(    // 配置信号量隔离
                                HystrixCommandProperties.Setter()
                                        .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)    // 信号量隔离
                                        .withExecutionIsolationSemaphoreMaxConcurrentRequests(3)
                                        .withFallbackIsolationSemaphoreMaxConcurrentRequests(1)
                        )
                // 设置了信号量隔离后,线程池配置将变无效
//                .andThreadPoolPropertiesDefaults(
//                        HystrixThreadPoolProperties.Setter()
//                        .withCoreSize(13)   // 配置线程池里的线程数
//                )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        return "run(): name="+name+",线程名是" + Thread.currentThread().getName();
    }

    @Override
    protected String getFallback() {
        return "getFallback(): name="+name+",线程名是" + Thread.currentThread().getName();
    }

    public static void main(String[] args) throws Exception{
        try {
            Thread.sleep(2000);
            for(int i = 0; i < 5; i++) {
                final int j = i;
                // 自主创建线程来执行command,创造并发场景
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("===========" + new HystrixSemaphorelLsolation("HLX" + j).execute());

                    }
                });
                thread.start();
            }
        } catch(Exception e) {
            e.printStackTrace();
        }

        System.in.read();

    }



}

4.熔断器

在生活中,如果电路的负载过高,保险箱会自动跳闸,以保护家里的各种电器,这就是熔断器的一个活生生例子。在Hystrix中也存在这样一个熔断器,当所依赖的服务不稳定时,能够自动熔断,并提供有损服务,保护服务的稳定性。在运行过程中,Hystrix会根据接口的执行状态(成功、失败、超时和拒绝),收集并统计这些数据,根据这些信息来实时决策是否进行熔断。

熔断器.png

线路的开路闭路详细逻辑如下:

  1. 假设线路内的容量(请求QPS)达到一定阈值(通过 HystrixCommandProperties.circuitBreakerRequestVolumeThreshold() 配置)
  2. 同时,假设线路内的错误率达到一定阈值(通过 HystrixCommandProperties.circuitBreakerErrorThresholdPercentage() 配置)
  3. 熔断器将从『闭路』转换成『开路』
  4. 若此时是『开路』状态,熔断器将短路后续所有经过该熔断器的请求,这些请求直接走『失败回退逻辑』
  5. 经过一定时间(即『休眠窗口』,通过 HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds() 配置),后续第一个请求将会被允许通过熔断器(此时熔断器处于『半开』状态),若该请求失败,熔断器将又进入『开路』状态,且在休眠窗口内保持此状态;若该请求成功,熔断器将进入『闭路』状态,回到逻辑1循环往复。

代码可以参考 【2.fallback(降级)中的 3、当回路器打开,命令的执行进入了熔断状态的代码片段 】

5.请求缓存

5.1 请求缓存有如下好处:

5.2 缓存的使用

要使用hystrix cache功能,第一个要求是重写getCacheKey(),用来构造cache key;第二个要求是构建context,如果请求B要用到请求A的结果缓存,A和B必须同处一个context。通过HystrixRequestContext.initializeContext()context.shutdown()可以构建一个context,这两条语句间的所有请求都处于同一个context。

代码如下:

public class CommandHelloWorld3 extends HystrixCommand<String> {

    private final int id;

    protected CommandHelloWorld3(int id) {
        super(HystrixCommandGroupKey.Factory.asKey("RequestCacheCommand"));
        this.id = id;
    }

    @Override
    protected String run() throws Exception {
        System.out.println(Thread.currentThread().getName() + " execute id = " + id);
        return "execute=" + id;
    }

    //重写getCacheKey,实现区分不同请求的逻辑
    @Override
    protected String getCacheKey() {
        System.out.println(" --- ");
        return String.valueOf(id);
    }

    public static void main(String[] args) {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();

        try {
            CommandHelloWorld3 commandHelloWorld3_a = new CommandHelloWorld3(22);
            CommandHelloWorld3 commandHelloWorld3_b = new CommandHelloWorld3(22);

            System.out.println("a执行结果:" + commandHelloWorld3_a.execute());
            System.out.println("a执行结果是否从缓存中获取:" + commandHelloWorld3_a.isResponseFromCache);

            System.out.println("b执行结果:" + commandHelloWorld3_b.execute());
            System.out.println("b执行结果是否从缓存中获取:" + commandHelloWorld3_b.isResponseFromCache);
        } finally  {
            context.shutdown();
        }

        context = HystrixRequestContext.initializeContext();
        try {
            CommandHelloWorld3 commandHelloWorld3_c = new CommandHelloWorld3(22);

            System.out.println("c执行结果:" + commandHelloWorld3_c.execute());
            System.out.println("c执行结果是否从缓存中获取:" + commandHelloWorld3_c.isResponseFromCache);

        } finally  {
            context.shutdown();
        }

      
    }
}

执行结果如下:

 --- 
 --- 
hystrix-RequestCacheCommand-1 execute id = 22
a执行结果:execute=22
a执行结果是否从缓存中获取:false
 --- 
 --- 
b执行结果:execute=22
b执行结果是否从缓存中获取:true
 --- 
 --- 
hystrix-RequestCacheCommand-2 execute id = 22
c执行结果:execute=22
c执行结果是否从缓存中获取:false

6.合并请求collapsing

下图展示了在两种场景(未增加『请求合并器』和增加『请求合并器』)下,线程和网络连接数量(假设所有请求在一个很小的时间窗口内,例如 10ms,是『并发』的):

request-collapsing.png

为什么要使用请求合并?

例如,这里有一个包含 300 个视频对象的列表,需要遍历这个列表,并对每一个对象调用 getSomeAttribute() 方法,但如果简单处理的话,可能会导致 300 次的网络请求(假设 getSomeAttribute() 方法内需要发出网络请求),每一个网络请求可能都会花上几毫秒(显然,这种方式非常容易拖慢系统)。

通过使用 HystrixCollapser,Hystrix 能自动完成请求的合并,可以将300个网络请求降低为只发送一次网络请求,大大的减少了网络的耗时。

Hystrix中的请求合并,就是利用一个合并处理器,将对同一个服务发起的连续请求合并成一个请求进行处理(这些连续请求的时间窗默认为10ms),可以有效节省网络带宽和线程池资源

请求合并带来的额外开销

请求合并会导致依赖服务的请求延迟增高(该延迟为等待请求的延迟),延迟的最大值为合并时间窗口大小。

若某个请求耗时的可能是 5ms,合并时间窗口为 10ms,但是现在必须再等10ms看看还有没有其他的请求一起的,这样一个请求的耗时就从5ms增加到15ms了。

请求合并带来的额外开销是否值得,取决于将要执行的命令,高延迟的命令相比较而言不会有太大的影响,因为这个时候时间窗的时间消耗就显得微不足道了

另外,如果一个命令具有高并发度,并且能批量处理多个,甚至上百个的话,请求合并带来的性能开销会因为吞吐量的极大提升而基本可以忽略,因为 Hystrix 会减少这些请求所需的线程和网络连接数量。如果一个合并时间窗口内只有 1~2 个请求,将请求合并显然不是明智的选择。

代码如下:

package com.jimingqiang.study.hystrix.hystrixbegin.collapsing;

import com.netflix.hystrix.*;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;


public class HystrixCollapsing extends HystrixCollapser<List<String>, String, Integer> {
    private final Integer key;

    public HystrixCollapsing(Integer key) {
        this.key = key;
    }

    /**
     * 请求参数
     * 如果有多个参数需要被绑定,创建一个单独的对象来包含它们,或者使用Tuple。
     * @return
     */
    @Override
    public Integer getRequestArgument() {
        return key;
    }


    /**
     * 创建一个批量请求命令
     * @param requests 保存了延迟时间窗中收集到的所有单个的请求的参数 String是返回结果类型,Integer是请求参数类型
     * @return
     */
    @Override
    protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
        return new BatchCommand(requests);  // 把批量请求传给command类
    }

    //

    /**
     * 把批量请求的结果和对应的请求一一对应起来
     * @param batchResponse 响应的结果
     * @param requests 请求参数
     */
    @Override
    protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
        int count = 0;
        for (CollapsedRequest<String, Integer> request : requests) {
            request.setResponse(batchResponse.get(count++));
        }
    }

    // command类
    private static final class BatchCommand extends HystrixCommand<List<String>> {
        private final Collection<CollapsedRequest<String, Integer>> requests;

        private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
            super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CollepsingGroup"))
                    .andCommandKey(HystrixCommandKey.Factory.asKey("CollepsingKey")
                    ));
            this.requests = requests;
        }

        @Override
        protected List<String> run() {
            ArrayList<String> response = new ArrayList<String>();
            // 处理每个请求,返回结果
            for (CollapsedRequest<String, Integer> request : requests) {
                // artificial response for each argument received in the batch
                response.add("ValueForKey: " + request.getArgument() + " thread:" + Thread.currentThread().getName());
            }
            return response;
        }
    }


    public static void main(String[] args) throws Exception {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try {

            Future<String> f1 = new HystrixCollapsing(1).queue();
            Future<String> f2 = new HystrixCollapsing(2).queue();

            Future<String> f3 = new HystrixCollapsing(3).queue();
            Future<String> f4 = new HystrixCollapsing(4).queue();

            Future<String> f5 = new HystrixCollapsing(5).queue();
            // f5和f6,如果sleep时间够小则会合并,如果sleep时间够大则不会合并,默认10ms
            TimeUnit.MILLISECONDS.sleep(1000);
            Future<String> f6 = new HystrixCollapsing(6).queue();

            System.out.println(f1.get());
            System.out.println(f2.get());
            System.out.println(f3.get());
            System.out.println(f4.get());
            System.out.println(f5.get());
            System.out.println(f6.get());

            // note:numExecuted表示共有几个命令执行,1个批量多命令请求算一个,这个实际值可能比代码写的要多,因为due to non-determinism of scheduler since this example uses the real timer
            int numExecuted = HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size();
            System.out.println("num executed: " + numExecuted);
            int numLogs = 0;
            for (HystrixInvokableInfo<?> command : HystrixRequestLog.getCurrentRequest().getAllExecutedCommands()) {
                numLogs++;
                System.err.println(command.getCommandKey().name() + " => command.getExecutionEvents(): " + command.getExecutionEvents());

            }
            System.out.println(numLogs==numExecuted);
        } finally {
            context.shutdown();
        }
    }


}

7. 配置策略

​ 具体的配置可以查看官网 官网配置地址,下边仅是简单的整理

参考:

Hystrix使用入门手册(中文)

Hystrix文档-实现原理

占小狼的Hystrix

上一篇下一篇

猜你喜欢

热点阅读