Spring Cloud Alibaba——Sentinel断路
前言
Sentinel的熔断降级通过断路器实现,本文通过介绍熔断器的定义、如何构建熔断器、断路器校验逻辑、断路器状态转换、异常/慢调用熔断流量是如何统计等方面梳理断路器的工作原理和实现方式。
一、断路器定义
Sentinel中的熔断降级使用断路器实现,先看下断路器概念,来自维基百科的定义:
断路器有分简单与较进阶的版本,简单的断路器只需要知道服务是否可用。而较进阶的版本比起前者更有效率。进阶的断路器带有至少三个状态:
- 关闭(Closed):断路器在预设的情形下是呈现关闭的状态,而断路器本身“带有”计数功能,每当错误发生一次,计数器也就会进行“累加”的动作,到了一定的错误发生次数断路器就会被“开启”,这个时候亦会在内部启用一个计时器,一旦时间到了就会切换成半开启的状态。
- 开启(Open):在开启的状态下任何请求都会“直接”被拒绝并且抛出异常讯息。
- 半开启(Half-Open):在此状态下断路器会允许部分的请求,如果这些请求都能成功通过,那么就意味着错误已经不存在,则会被“切换回”关闭状态并“重置”计数。倘若请求中有“任一”的错误发生,则会回复到“开启”状态,并且重新计时,给予系统一段休息时间。
说明:从概念中可以看出断路器的关键点在于统计流量与三种状态的转换。
服务治理中引入熔断机制,使得系统更加稳定和有弹性,在系统从错误中恢复的时候提供稳定性,并且减少了错误对系统性能的影响,可以快速拒绝可能导致错误的服务调用,而不需要等待真正的错误返回。
二、构建断路器
通过DegradeRuleManager.loadRules或者getProperty().updateValue使降级规则生效时,会将DegradeRule转换为断路器CircuitBreaker。
public final class DegradeRuleManager {
private static volatile Map<String, List<CircuitBreaker>> circuitBreakers = new HashMap<>();
private static volatile Map<String, Set<DegradeRule>> ruleMap = new HashMap<>();
private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {
private synchronized void reloadFrom(List<DegradeRule> list) {
//构建断路器
Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);
Map<String, Set<DegradeRule>> rm = new HashMap<>(cbs.size());
for (Map.Entry<String, List<CircuitBreaker>> e : cbs.entrySet()) {
assert e.getValue() != null && !e.getValue().isEmpty();
Set<DegradeRule> rules = new HashSet<>(e.getValue().size());
for (CircuitBreaker cb : e.getValue()) {
rules.add(cb.getRule());
}
rm.put(e.getKey(), rules);
}
DegradeRuleManager.circuitBreakers = cbs;
DegradeRuleManager.ruleMap = rm;
}
}
}
断路器由两类构成:
- 1、慢调用使用ResponseTimeCircuitBreaker。
- 2、异常数和异常比例使用ExceptionCircuitBreaker。
public final class DegradeRuleManager {
private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
switch (rule.getGrade()) {
//慢调用使用
case RuleConstant.DEGRADE_GRADE_RT:
return new ResponseTimeCircuitBreaker(rule);
//按当前{@link IntervalProperty#INTERVAL}秒内的biz异常比率降级
case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO:
//在过去60秒内按业务异常计数降级
case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:
return new ExceptionCircuitBreaker(rule);
default:
return null;
}
}
}
断路器类图
断路器类图三、断路器校验
DegradeSlot负责熔断规则的校验,tryPass方法执行具体的判断。
@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
//在触发后续slot前执行熔断的检查
performChecking(context, resourceWrapper);
//触发后续的slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
//通过资源名称获取所有的熔断CircuitBreaker
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
// 尝试通过
//cb.tryPass里面只做了状态检查 , 熔断是否关闭或者打开
if (!cb.tryPass(context)) {
//该异常为BlockException子类
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
}
断路器有三种状态,CLOSE:正常通行,HALF_OPEN:允许探测通行,OPEN:拒绝通行,这里解释下为啥会有HALF_OPEN状态出现,比如我们对同一个资源设置了两个降级规则 R1:熔断时间为100ms、R2:熔断时间为200ms,当R1已到恢复点,此时R2还未恢复,R1状态会从OPEN变为HALF_OPEN,R1本次校验通过,由于R2还未恢复 R2校验不通过,本次资源请求依然是不通过的,但如果R1、R2都已恢复 正常通行,在entry.exit()会将状态设置为CLOSE后续请求正常通行,这就是HALF_OPEN出现的目的。
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
@Override
public boolean tryPass(Context context) {
// 正常通行
if (currentState.get() == State.CLOSED) {
return true;
}
// 尝试通行
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for probing.
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
}
判断的逻辑:
-
如果熔断器状态为关闭,则返回true,即允许请求通过。
-
如果熔断器状态为开启,并且已经超过熔断时长以及开启状态成功转换为半开启(探测)状态,则返回true,即允许请求通过。
-
如果熔断器状态为开启,并且还在熔断时长内,则返回false,禁止请求通过。
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
protected boolean fromOpenToHalfOpen(Context context) {
// 尝试将状态从OPEN设置为HALF_OPEN
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
// 状态变化通知
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
// 在entry添加一个exitHandler entry.exit()时会调用
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
// 如果有发生异常,重新将状态设置为OPEN 请求不同通过
if (entry.getBlockError() != null) {
// Fallback to OPEN due to detecting request is blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
// 此时状态已设置为HALF_OPEN正常通行
return true;
}
return false;
}
}
class CtEntry extends Entry {
/**
* 注意:退出处理程序将在插槽链的onExit之后调用callExitHandlersAndCleanUp方法。
*/
private void callExitHandlersAndCleanUp(Context ctx) {
if (exitHandlers != null && !exitHandlers.isEmpty()) {
for (BiConsumer<Context, Entry> handler : this.exitHandlers) {
try {
handler.accept(ctx, this);
} catch (Exception e) {
RecordLog.warn("Error occurred when invoking entry exit handler, current entry: "
+ resourceWrapper.getName(), e);
}
}
exitHandlers = null;
}
}
}
那探测和开启状态都允许请求通过,在“熔断降级说明”文章中知道,探测状态只允许一个请求通过,这个是在哪里控制的呢?
上面只看到了状态从OPEN变为HALF_OPEN,HALF_OPEN变为OPEN,但没有看到状态如何从HALF_OPEN变为CLOSE的,它的变化过程是在正常执行完请求后,entry.exit()会调用DegradeSlot.exit()方法来改变状态:
四、断路器状态转换
在调用Entry#exit()时,会触发插槽链条的退出调用。具体到熔断降级DegradeSlot#exit方法。通过circuitBreaker.onRequestComplete回调熔断器执行状态切换。
@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
//如果当前其他solt已经有了BlockException直接调用fireExit 不用继续走熔断逻辑了,
//注意是BlockException ,如果是其他异常,会是熔断的一个判断条件
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}
//通过资源名称获取所有的熔断CircuitBreaker
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
}
if (curEntry.getBlockError() == null) {
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(context);
}
}
fireExit(context, r, count, args);
}
}
1、异常熔断器状态转换
ExceptionCircuitBreaker负责异常数/异常比例的熔断,通过滑动窗口统计发生错误数及请求总数。
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
private final LeapArray<SimpleErrorCounter> stat;
@Override
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
//异常时间窗口计数器
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
// 发生异常,异常数加1
counter.getErrorCount().add(1);
}
//总数加1
counter.getTotalCount().add(1);
handleStateChangeWhenThresholdExceeded(error);
}
}
详细熔断逻辑
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
//如果熔断开启,发生错误继续熔断
if (currentState.get() == State.OPEN) {
return;
}
if (currentState.get() == State.HALF_OPEN) {
//没有异常,熔断器由半开启转换为关闭,允许所有请求通过
// 未发生异常 HALF_OPEN >>> CLOSE
if (error == null) {
fromHalfOpenToClose();
} else {
//请求还是发生异常,熔断器由半开起转为开启,熔断所有请求
// 发生异常 HALF_OPEN >>> OPEN
fromHalfOpenToOpen(1.0d);
}
return;
}
//下面为熔断器关闭状态
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
//计算异常请求数量以及请求总数
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
//最小请求数内不发生熔断
if (totalCount < minRequestAmount) {
return;
}
// 当前异常数
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// 算出当前的异常比例
curCount = errCount * 1.0d / totalCount;
}
// 判断当前异常数或异常比例是否达到设定的阀值
if (curCount > threshold) {
// 超出设定 将状态设置为OPEN
transformToOpen(curCount);
}
}
}
2、慢调用熔断器状态转换
ResponseTimeCircuitBreaker负责慢调用的熔断,通过滑动窗口统计慢调用数量及总的请求数。
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
@Override
public void onRequestComplete(Context context) {
//获取当前滑动窗口
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - entry.getCreateTimestamp();
if (rt > maxAllowedRt) {
//rt时间超时,慢调用数加1
counter.slowCount.add(1);
}
//总的请求数加1
counter.totalCount.add(1);
handleStateChangeWhenThresholdExceeded(rt);
}
}
详细熔断逻辑
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
private void handleStateChangeWhenThresholdExceeded(long rt) {
//如果熔断开启,拦截所有请求
if (currentState.get() == State.OPEN) {
return;
}
//如果熔断半开启状态,根据接下来的一个请求判断
if (currentState.get() == State.HALF_OPEN) {
if (rt > maxAllowedRt) {
//请求RT大于设置的阈值,熔断状态由半开启转换为开启
fromHalfOpenToOpen(1.0d);
} else {
//请求RT小于设置的阈值,熔断状态由半开启转换为关闭
fromHalfOpenToClose();
}
return;
}
//下面熔断状态为关闭
List<SlowRequestCounter> counters = slidingCounter.values();
long slowCount = 0;
long totalCount = 0;
for (SlowRequestCounter counter : counters) {
//统计慢调用数量和总调用数量
slowCount += counter.slowCount.sum();
totalCount += counter.totalCount.sum();
}
//总调用小于最小请求阈值,不做熔断
if (totalCount < minRequestAmount) {
return;
}
double currentRatio = slowCount * 1.0d / totalCount;
if (currentRatio > maxSlowRequestRatio) {
//慢调用比例大于阈值,熔断状态由关闭转变为开启
transformToOpen(currentRatio);
}
if (Double.compare(currentRatio, maxSlowRequestRatio) == 0 &&
Double.compare(maxSlowRequestRatio, SLOW_REQUEST_RATIO_MAX_VALUE) == 0) {
//慢调用比例等于阈值,慢调用比例等于慢速请求比率最大值
//熔断状态由关闭转变为开启
transformToOpen(currentRatio);
}
}
}
五、滑动窗口流量统计
- 1、异常熔断流量统计
异常熔断ExceptionCircuitBreaker的流量统计通过SimpleErrorCounterLeapArray实现,默认采样窗口sampleCount为1,统计区间intervalInMs为1秒。
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
private final int strategy;
private final int minRequestAmount;
private final double threshold;
private final LeapArray<SimpleErrorCounter> stat;
public ExceptionCircuitBreaker(DegradeRule rule) {
this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));
}
ExceptionCircuitBreaker(DegradeRule rule, LeapArray<SimpleErrorCounter> stat) {
super(rule);
this.strategy = rule.getGrade();
boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;
AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count");
AssertUtil.notNull(stat, "stat cannot be null");
this.minRequestAmount = rule.getMinRequestAmount();
this.threshold = rule.getCount();
this.stat = stat;
}
static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {
public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}
@Override
public SimpleErrorCounter newEmptyBucket(long timeMillis) {
return new SimpleErrorCounter();
}
@Override
protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
}
}
- 2、慢调用熔断流量统计
慢调用熔断ResponseTimeCircuitBreaker的流量统计通过SlowRequestLeapArray实现,默认采样窗口sampleCount为1,统计区间intervalInMs为1秒。
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
private static final double SLOW_REQUEST_RATIO_MAX_VALUE = 1.0d;
private final long maxAllowedRt;
private final double maxSlowRequestRatio;
private final int minRequestAmount;
private final LeapArray<SlowRequestCounter> slidingCounter;
public ResponseTimeCircuitBreaker(DegradeRule rule) {
this(rule, new SlowRequestLeapArray(1, rule.getStatIntervalMs()));
}
ResponseTimeCircuitBreaker(DegradeRule rule, LeapArray<SlowRequestCounter> stat) {
super(rule);
AssertUtil.isTrue(rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT, "rule metric type should be RT");
AssertUtil.notNull(stat, "stat cannot be null");
this.maxAllowedRt = Math.round(rule.getCount());
this.maxSlowRequestRatio = rule.getSlowRatioThreshold();
this.minRequestAmount = rule.getMinRequestAmount();
this.slidingCounter = stat;
}
static class SlowRequestLeapArray extends LeapArray<SlowRequestCounter> {
public SlowRequestLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}
@Override
public SlowRequestCounter newEmptyBucket(long timeMillis) {
return new SlowRequestCounter();
}
@Override
protected WindowWrap<SlowRequestCounter> resetWindowTo(WindowWrap<SlowRequestCounter> w, long startTime) {
w.resetTo(startTime);
w.value().reset();
return w;
}
}
}
参考:
https://blog.csdn.net/gaoliang1719/article/details/108898677