熔断器设计

2021-01-10  本文已影响0人  放开那个BUG

1、前言

看别人 RPC 框架代码有熔断器的代码,但是对于熔断器并不是很了解,于是了解一下熔断器设计。熔断器跟限流器不同,它主要是保证服务调用者在调用异常服务时,快速返回失败结果,避免大量的同步等待,造成服务雪崩,并且能在一段时间后继续侦测请求执行结果, 提供恢复服务调用的可能。而限流器是单纯的限流,不让大量流量把服务搞挂调,服务本身是正常的。

熔断器的基本原理如下:


熔断器原理

熔断器本身就是一个状态机。

1.关闭状态:熔断器的初始化状态,该状态下允许请求通过。当失败超过阀值,转入打开状态,
2.打开状态:熔断状态,该状态下不允许请求通过,当进入该状态经过一段时间,进入半开状态。
3.半开状态:在半开状态期间,允许部分请求通过,在半开期间,观察失败状态是否超过阀值。如果没有超过进入关闭状态,如果超过了进入关闭状态。如此往复

2、设计

整个设计使用状态模式(即外部表现行为是 CircuitBreaker,内部状态变化由各个熔断器的 state 类变化),首先着眼点应该在 CircuitBreaker 上,由他理解各个状态的变化过程。

熔断器对外暴露接口:

package com.example.demo.circulbreaker;

/**
 * 熔断器接口
 */
public interface CircuitBreaker {
    /**
     * 重置熔断器
     */
    void reset();

    /**
     * 是否允许通过熔断器
     */
    boolean canPassCheck();

    /**
     * 统计失败次数
     */
    void countFailNum();
}

熔断器公共抽象类:

package com.example.demo.circulbreaker;

/**
 * User: Rudy Tan
 * Date: 2018/9/21
 *
 * 基础熔断器
 */
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
    /**
     * 熔断器当前状态,声明成 volatile 是防止更改状态的时候,其他线程拿着旧状态做操作
     */
    private volatile CBState state = new CloseCBState();

    /**
     * 在熔断器关闭的情况下,在多少秒内失败多少次进入,熔断打开状态(默认10分钟内,失败10次进入打开状态)
     */
    public String thresholdFailRateForClose = "10/600";

    /**
     * 在熔断器打开的情况下,熔断多少秒进入半开状态,(默认熔断30分钟)
     */
    public int thresholdIdleTimeForOpen = 1800;

    /**
     * 在熔断器半开的情况下, 在多少秒内放多少次请求,去试探(默认10分钟内,放10次请求)
     */
    public String thresholdPassRateForHalfOpen = "10/600";

    /**
     * 在熔断器半开的情况下, 试探期间,如果有超过多少次失败的,重新进入熔断打开状态,否者进入熔断关闭状态。
     */
    public int thresholdFailNumForHalfOpen = 1;


    public CBState getState() {
        return state;
    }

    public void setState(CBState state) {
        // 当前状态不能切换为当前状态
        CBState currentState = getState();
        if (currentState.getStateName().equals(state.getStateName())){
            return;
        }

        // 多线程环境加锁
        synchronized (this){
            // 二次判断
            currentState = getState();
            if (currentState.getStateName().equals(state.getStateName())){
                return;
            }

            // 更新状态
            this.state = state;
            System.out.println("熔断器状态转移:" + currentState.getStateName() + "->" + state.getStateName());
        }
    }
}

熔断器实现类(各种实现都行,作者是用来简化实现):

package com.example.demo.circulbreaker;

/**
 * User: Rudy Tan
 * Date: 2018/9/22
 *
 * 本地熔断器(把它当成了工厂了)
 */
public class LocalCircuitBreaker extends AbstractCircuitBreaker {
    public LocalCircuitBreaker(String failRateForClose,
                               int idleTimeForOpen,
                               String passRateForHalfOpen, int failNumForHalfOpen){
        this.thresholdFailRateForClose = failRateForClose;
        this.thresholdIdleTimeForOpen = idleTimeForOpen;
        this.thresholdPassRateForHalfOpen = passRateForHalfOpen;
        this.thresholdFailNumForHalfOpen = failNumForHalfOpen;
    }

    public void reset() {
        this.setState(new CloseCBState());
    }

    public boolean canPassCheck() {
        return getState().canPassCheck(this);
    }

    public void countFailNum() {
        getState().countFailNum(this);
    }
}

熔断器状态接口:

package com.example.demo.circulbreaker;

/**
 * 熔断器状态
 */
public interface CBState {
    /**
     * 获取当前状态名称
     */
    String getStateName();

    /**
     * 检查以及校验当前状态是否需要扭转
     */
    void checkAndSwitchState(AbstractCircuitBreaker cb);

    /**
     * 是否允许通过熔断器
     */
    boolean canPassCheck(AbstractCircuitBreaker cb);

    /**
     * 统计失败次数
     */
    void countFailNum(AbstractCircuitBreaker cb);
}

各个熔断器状态:

关闭状态:

package com.example.demo.circulbreaker;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * User: Rudy Tan
 * Date: 2018/9/21
 *
 * 熔断器-关闭状态
 */
public class CloseCBState implements CBState {

    /**
     * 进入当前状态的初始化时间
     */
    private long stateTime = System.currentTimeMillis();

    /**
     * 关闭状态,失败计数器,以及失败计数器初始化时间
     */
    private AtomicInteger failNum = new AtomicInteger(0);
    private long failNumClearTime = System.currentTimeMillis();

    public String getStateName() {
        // 获取当前状态名称
        return this.getClass().getSimpleName();
    }

    public void checkAndSwitchState(AbstractCircuitBreaker cb) {
        // 阀值判断,如果失败到达阀值,切换状态到打开状态
        long maxFailNum = Long.valueOf(cb.thresholdFailRateForClose.split("/")[0]);
        if (failNum.get() >= maxFailNum){
            cb.setState(new OpenCBState());
        }
    }

    public boolean canPassCheck(AbstractCircuitBreaker cb) {
        // 关闭状态,请求都应该允许通过
        return true;
    }

    public void countFailNum(AbstractCircuitBreaker cb) {
        // 检查计数器是否过期了,如果过期重新计数
        long period = Long.valueOf(cb.thresholdFailRateForClose.split("/")[1]) * 1000;
        long now = System.currentTimeMillis();
        // 过期
        if (failNumClearTime + period <= now){
            failNum.set(0);
        }
        // 失败计数
        failNum.incrementAndGet();

        // 检查是否切换状态
        checkAndSwitchState(cb);
    }
}

打开状态:

package com.example.demo.circulbreaker;

/**
 * User: Rudy Tan
 * Date: 2018/9/21
 *
 * 熔断器-打开状态
 */
public class OpenCBState implements CBState {
    /**
     * 进入当前状态的初始化时间
     */
    private long stateTime = System.currentTimeMillis();

    public String getStateName() {
        // 获取当前状态名称
        return this.getClass().getSimpleName();
    }

    public void checkAndSwitchState(AbstractCircuitBreaker cb) {
        // 打开状态,检查等待时间是否已到,如果到了就切换到半开状态
        long now = System.currentTimeMillis();
        long idleTime = cb.thresholdIdleTimeForOpen * 1000L;
        if (stateTime + idleTime <= now){
            cb.setState(new HalfOpenCBState());
        }
    }

    public boolean canPassCheck(AbstractCircuitBreaker cb) {
        // 检测状态
        checkAndSwitchState(cb);
        return false;
    }

    public void countFailNum(AbstractCircuitBreaker cb) {
        // nothing
    }
}

半开状态:

package com.example.demo.circulbreaker;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * User: Rudy Tan
 * Date: 2018/9/21
 *
 * 熔断器-半开状态
 */
public class HalfOpenCBState implements CBState {

    /**
     * 进入当前状态的初始化时间
     */
    private long stateTime = System.currentTimeMillis();

    /**
     * 半开状态,失败计数器
     */
    private AtomicInteger failNum = new AtomicInteger(0);

    /**
     * 半开状态,允许通过的计数器
     */
    private AtomicInteger passNum = new AtomicInteger(0);


    public String getStateName() {
        // 获取当前状态名称
        return this.getClass().getSimpleName();
    }

    public void checkAndSwitchState(AbstractCircuitBreaker cb) {
        // 判断半开时间是否结束
        long idleTime = Long.valueOf(cb.thresholdPassRateForHalfOpen.split("/")[1]) * 1000L;
        long now = System.currentTimeMillis();
        if (stateTime + idleTime <= now){
            // 如果半开状态已结束,失败次数是否超过了阀值
            int maxFailNum = cb.thresholdFailNumForHalfOpen;
            if (failNum.get() >= maxFailNum){
                // 失败超过阀值,认为服务没有恢复,重新进入熔断打开状态
                cb.setState(new OpenCBState());
            }else {
                // 没超过,认为服务恢复,进入熔断关闭状态
                cb.setState(new CloseCBState());
            }
        }
    }

    public boolean canPassCheck(AbstractCircuitBreaker cb) {
        // 检查是否切换状态
        checkAndSwitchState(cb);

        // 超过了阀值,不再放量
        int maxPassNum = Integer.valueOf(cb.thresholdPassRateForHalfOpen.split("/")[0]);
        if (passNum.get() > maxPassNum){
            return false;
        }
        // 检测是否超过了阀值
        if (passNum.incrementAndGet() <= maxPassNum){
            return true;
        }
        return false;
    }

    public void countFailNum(AbstractCircuitBreaker cb) {
        // 失败计数
        failNum.incrementAndGet();

        // 检查是否切换状态
        checkAndSwitchState(cb);
    }
}

测试类:

package com.example.demo.circulbreaker;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

/**
 * User: Rudy Tan
 * Date: 2018/8/27
 */
public class App {

    public static void main(String[] args) throws InterruptedException {
        final int maxNum = 200;
        final CountDownLatch countDownLatch = new CountDownLatch(maxNum);

        final CircuitBreaker circuitBreaker = new LocalCircuitBreaker("5/20", 10, "5/10", 2);

        for (int i=0; i < maxNum; i++){
            new Thread(new Runnable() {
                public void run() {
                    // 模拟随机请求
                    try {
                        Thread.sleep(new Random().nextInt(20) * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    try{
                        // 过熔断器
                        if (circuitBreaker.canPassCheck()){
                            // do something
                            System.out.println("正常业务逻辑操作");

                            // 模拟后期的服务恢复状态
                            if (countDownLatch.getCount() >= maxNum/2){
                                // 模拟随机失败
                                if (new Random().nextInt(2) == 1){
                                    throw new Exception("mock error");
                                }
                            }
                        } else {
                            System.out.println("拦截业务逻辑操作");
                        }
                    }catch (Exception e){
                        System.out.println("业务执行失败了");
                        // 熔断器计数器
                        circuitBreaker.countFailNum();
                    }

                    countDownLatch.countDown();
                }
            }).start();

            // 模拟随机请求
            try {
                Thread.sleep(new Random().nextInt(5) * 100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        countDownLatch.await();
        System.out.println("end");
    }
}

3、参考资料

https://www.jianshu.com/p/fad81d945e2d

上一篇下一篇

猜你喜欢

热点阅读