Hystrix 熔断器 实例 demo

2018-03-14  本文已影响741人  把爱放下会走更远

Hystrix是刺猬的意思,为了保护自己,不能让被调用者引起本身系统的不可用,甚至一直向上影响到整个微服务系统,Hystrix基于Rxjava实现,不过是老的1.x版本的Rxjava,新的2.x的版本还未使用

Hystrix采取了线程池隔离(默认)和信号量隔离,简而言之,当你调用一个第三方系统或者dubbo服务A的时候,这部分操作会全部放入到一个线程池中去处理,然后监控这个线程池的情况,比如池满了就会报错了(当然配了QueueSize的除外),信号量是简单一个变量来监控当前的并发量,当达到一定值的之后就会报错,当然我们可以处理这个错误然后返回自己规定的数据(gatFallBack),看下图


image.png

Hystrix有自己的监控机制,统计算法,参考下面这张图,数据是在向左滚动的,我们可以配置滚动统计的时间间隔,比如3秒钟统计一次,通过统计的数据Hystrix能做出对应的保护机制,比如调用第三方的失败率达到百分之50%的时候会做快速失败(即直接失败),然后在5秒钟之后又去尝试一下看看是不是恢复了


image.png

下面我们来测试一下Hystrix的效果

①测试线程池满了

MyCommand
public class MyCommand extends HystrixCommand<String> {

  private final String group;
  private MyService service;
  private String thing;

  public MyCommand(String group, String thing) {
//    super(HystrixCommandGroupKey.Factory.asKey(group));
    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(group))
            .andCommandKey(HystrixCommandKey.Factory.asKey("myCommand"))
            .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("myCommandThreadPool"))
            .andCommandPropertiesDefaults(
                HystrixCommandProperties.Setter()
                    .withCircuitBreakerRequestVolumeThreshold(5)
                    .withCircuitBreakerErrorThresholdPercentage(60)
//                    .withExecutionTimeoutInMilliseconds(2000)
                    .withExecutionTimeoutEnabled(false)//配合下方MyService的等待15s,防止超时直接报错
                    .withMetricsRollingStatisticalWindowInMilliseconds(1000))
            .andThreadPoolPropertiesDefaults(
                HystrixThreadPoolProperties.Setter()
                    .withCoreSize(10))//这里我们设置了线程池大小为10
    );
    this.group = group;
    this.thing = thing;
  }

  @Override
  protected String run() throws Exception {
    service.doSth(thing);
    return thing + " over";
  }
 @Override
  protected String getFallback() {
    return thing + " Failure! ";
  }

  public void setService(MyService service) {
    this.service = service;
  }
}
MyService
public class MyService {
  public void doSth(String thing) {
    System.out.println("doSth->" + thing + "->begin");
    String threadName = Thread.currentThread().getName();
    System.out.println(threadName + ":doSth->" + thing + "->doing");
    try {
      Thread.sleep(15000);//这里让线程等待15s,保持线程不释放
    } catch (Exception e) {
      e.printStackTrace();
    }
    System.out.println("doSth->" + thing + "->end");
  }
}
MyTest
public class MyTest {
  @Test
  public void test() throws Exception {
    MyService service = new MyService();
    List<Future<String>> fl = new ArrayList<>(10);
    for (int i = 0; i < 15; i++) {
      Thread.sleep(1000);
      MyCommand command = new MyCommand("TestGroup", "fishing" + (i * i));
      command.setService(service);
      Observable<String> observable = command.toObservable();
      observable.subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {
          System.out.println("执行command发生错误!");
          e.printStackTrace();
        }

        @Override
        public void onNext(String s) {
          System.out.println(s);
        }
      });
    }
  }
}
输出结果
doSth->fishing0->begin
hystrix-myCommandThreadPool-1:doSth->fishing0->doing
doSth->fishing1->begin
hystrix-myCommandThreadPool-2:doSth->fishing1->doing
doSth->fishing4->begin
hystrix-myCommandThreadPool-3:doSth->fishing4->doing
doSth->fishing9->begin
hystrix-myCommandThreadPool-4:doSth->fishing9->doing
doSth->fishing16->begin
hystrix-myCommandThreadPool-5:doSth->fishing16->doing
doSth->fishing25->begin
hystrix-myCommandThreadPool-6:doSth->fishing25->doing
doSth->fishing36->begin
hystrix-myCommandThreadPool-7:doSth->fishing36->doing
doSth->fishing49->begin
hystrix-myCommandThreadPool-8:doSth->fishing49->doing
doSth->fishing64->begin
hystrix-myCommandThreadPool-9:doSth->fishing64->doing
doSth->fishing81->begin
hystrix-myCommandThreadPool-10:doSth->fishing81->doing
fishing100 Failure! 
fishing121 Failure! 
fishing144 Failure! 
fishing169 Failure! 
fishing196 Failure! 
可以看到从第10个开始就失败了,因为线程池的10个线程已经被全部占满了

②测试失败率导致快速失败

PercentCommand
public class PercentCommand extends HystrixCommand<String> {

  private final String group;
  private PercentService service;
  private int seed;

  public PercentCommand(String group, int seed) {
    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(group))
        .andCommandPropertiesDefaults(HystrixPropertiesCommandDefault.Setter()
            .withCircuitBreakerErrorThresholdPercentage(50)//错误率超过50%,快速失败
            .withExecutionTimeoutInMilliseconds(2000)
            .withCircuitBreakerRequestVolumeThreshold(5)
            .withCircuitBreakerSleepWindowInMilliseconds(30000)//30s后放部分流量过去重试
            .withMetricsRollingPercentileBucketSize(10)
            .withMetricsRollingPercentileWindowInMilliseconds(5000)));
    this.group = group;
    this.seed = seed;
  }

  @Override
  protected String run() throws Exception {
    return service.ex(seed);
  }

  public void setService(PercentService service) {
    this.service = service;
  }

  @Override
  protected String getFallback() {
    return seed + "失败了";
  }
}
PercentService
public class PercentService {
  public String ex(int seed) {
    if (seed < 10) {
      throw new RuntimeException("抛个异常测试提高异常率!");
    }
    return seed + "想返回吗?";
  }
}
PercentTest
public class PercentTest {

  @Test
  public void test() throws Exception {
    PercentService service = new PercentService();
    for (int i = 0; i < 15; i++) {
      Thread.sleep(1000);
      PercentCommand command = new PercentCommand("TestGroup", i);
      command.setService(service);
      Observable<String> observable = command.toObservable();
      observable.subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {
          System.out.println("eeeeeeeeeeeeee");
        }

        @Override
        public void onNext(String s) {
          System.out.println(s);
        }
      });
    }

  }
}
输出结果
......
09:35:04.057 [hystrix-TestGroup-5] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.RuntimeException: 抛个异常测试提高异常率!
    at com.helian.techaggrhystrix.errorpercent.PercentService.ex(PercentService.java:15)
    at com.helian.techaggrhystrix.errorpercent.PercentCommand.run(PercentCommand.java:31)
    at com.helian.techaggrhystrix.errorpercent.PercentCommand.run(PercentCommand.java:10)
    at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:302)
    at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:298)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
    at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
    at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.unsafeSubscribe(Observable.java:10151)
    at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
    at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction$1.call(HystrixContexSchedulerAction.java:56)
    at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction$1.call(HystrixContexSchedulerAction.java:47)
    at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction.call(HystrixContexSchedulerAction.java:69)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
4失败了
5失败了
6失败了
7失败了
8失败了
9失败了
10失败了
11失败了
12失败了
13失败了
14失败了
---------------------------------------------------------------------------
在service里面我们直接抛出异常,test里每隔1秒发出一个请求,前面的5个请求都执行了service的代码,
抛出了异常,到达第五个的时候,熔断器监控到失败率高达100%,那么熔断器打开,执行快速失败,
直接执行getFailBack方法了,配合上面的withCircuitBreakerSleepWindowInMilliseconds(30000),
默认是5s的,也就是30s才会去重试,所以后面的全部快速失败了,试着更改这个值为3000看看,
结果应该不一样

上一篇下一篇

猜你喜欢

热点阅读