【sentinel】深入浅出之原理篇StatisticSlot&
2019-03-18 本文已影响0人
一滴水的坚持
StatisticSlot
则用于记录,统计不同纬度的 runtime 信息,在这里记录线程数变化,请求数量,计算RT时间,代码比较简单:
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
//请求通过,增加线程数
node.increaseThreadNum();
//增加请求通过数
node.addPassRequest(count);
//如果原始节点存在,则新增线程数和通过的请求总数
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
//如果是IN,则在Cluster节点上新增线程数和通过请求数,这个是全局的ClusterNode,和ClusterBuilderSlot的ClusterNode不一样,此处所有请求共享同一个Cluster
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
//钩子函数
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
//增加线程数
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
}
//增加线程数 共享全局Cluster
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
}
//钩子函数
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
context.getCurEntry().setError(e);
//节点Block数量加一
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockQps(count);
}
//钩子,扩展
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
node.increaseExceptionQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}
throw e;
}
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
//计算响应时间,通过当前时间-CurEntry的创建时间取毫秒值
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
//新增响应时间和成功数
node.addRtAndSuccess(rt, count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
}
//线程数减1
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
//全局线程数-1
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
//回调钩子
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
fireExit(context, resourceWrapper, count);
}
}
逻辑简单,但实现并不简单,先了解一下DefaultNode
的Api:
public class DefaultNode extends StatisticNode {
private ResourceWrapper id;
private volatile Set<Node> childList = new HashSet<>();
private ClusterNode clusterNode;
@Override
public void increaseBlockQps(int count) {
super.increaseBlockQps(count);
this.clusterNode.increaseBlockQps(count);
}
@Override
public void increaseExceptionQps(int count) {
super.increaseExceptionQps(count);
this.clusterNode.increaseExceptionQps(count);
}
@Override
public void addRtAndSuccess(long rt, int successCount) {
super.addRtAndSuccess(rt, successCount);
this.clusterNode.addRtAndSuccess(rt, successCount);
}
@Override
public void increaseThreadNum() {
super.increaseThreadNum();
this.clusterNode.increaseThreadNum();
}
@Override
public void decreaseThreadNum() {
super.decreaseThreadNum();
this.clusterNode.decreaseThreadNum();
}
@Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
private void visitTree(int level, DefaultNode node) {
for (int i = 0; i < level; ++i) {
System.out.print("-");
}
if (!(node instanceof EntranceNode)) {
System.out.println(
String.format("%s(thread:%s pq:%s bq:%s tq:%s rt:%s 1mp:%s 1mb:%s 1mt:%s)", node.id.getShowName(),
node.curThreadNum(), node.passQps(), node.blockQps(), node.totalQps(), node.avgRt(),
node.totalRequest() - node.blockRequest(), node.blockRequest(), node.totalRequest()));
} else {
System.out.println(
String.format("Entry-%s(t:%s pq:%s bq:%s tq:%s rt:%s 1mp:%s 1mb:%s 1mt:%s)", node.id.getShowName(),
node.curThreadNum(), node.passQps(), node.blockQps(), node.totalQps(), node.avgRt(),
node.totalRequest() - node.blockRequest(), node.blockRequest(), node.totalRequest()));
}
for (Node n : node.getChildList()) {
DefaultNode dn = (DefaultNode)n;
visitTree(level + 1, dn);
}
}
}
上文链接 ClusterBuilderSlot
原理介绍已经提到过,一个ContextName对应的同一个Resource对应ClusterNode为同一个,所以这里同步新增,或减少记录数,都是基于当前节点和对应的ClusterNode一起统计的。
不管是ClusterNode,或者DefaultNode节点,对其添加,或记录Qps,rt都是基于父类去实现,这样来讲,所有Sentinel最核心的代码就在StatisticNode
中。
在StatisticNode
中,是这样注释的:
Sentinel使用滑动窗口来记录和统计实时调用数据。
- 当第一个请求到来,Sentinel会创建一个特殊的时间片(time-span)去保存运行时的数据,比如:响应时间(rt),QPS, block request,在这里叫做滑动窗口(window bucket),这个滑动窗口通过sample count定义。Sentinel通过滑动窗口有效的数据来决定当前请求是否通过,滑动窗口将记录所有的qps,将其与规则中定义的阈值进行比较。
- 不同的请求进来,根据不同的时间存放在不同滑动窗口中。
- 请求不断的进入系统,先前的滑动窗口将会过期无效。
理解StatisticNode
节点之前,先了解几个数据结构:
-
LeapArray
Sentinel中的metrics的基本数据结构- LeapArray使用滑动窗口算法统计数据,每一个桶覆盖windowLengthInMs的时间长数据,总的时间长度是intervalInMs,所以,sampleCount = intervalInMs / windowLengthInMs。
public abstract class LeapArray<T> {
//单位时间窗口长度
protected int windowLengthInMs;
//总的桶个数
protected int sampleCount;
//总的时间长度
protected int intervalInMs;
//记录的窗口数,长度与sampleCount一样
protected final AtomicReferenceArray<WindowWrap<T>> array;
}
构造方法如下:
public LeapArray(int sampleCount, int intervalInMs) {
//每ms的窗口长度为总的时间长度/桶的总数
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
//记录每个windowLengthInMs的滑动窗口信息
this.array = new AtomicReferenceArray<>(sampleCount);
}
而在WindowWrap
中,则记录了该窗口的开始时间,和时长,和该时间窗口的数据信息。
public class WindowWrap<T> {
//窗口长度
private final long windowLengthInMs;
//窗口开始时间 long类型,
private long windowStart;
//data数据
private T value;
//复位该时间窗口
public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}
//判断是否该时间在该窗口内
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
}
}
继续回到 LeapArray
,看看如何根据时间找到该窗口:
- 根据当前时间,算出该时间的timeId,并根据timeId算出当前窗口在采样窗口数组中的索引idx
- 根据当前时间算出当前窗口的应该对应的开始时间time,以毫秒为单位,时间窗口开始时间为
windowLengthInMs的整数倍
(取该时间单位整数开始时间,比如1000501,则从1000500开始)。 - 获取idx位置的窗口
- 当窗口为null,则创建一个新的时间窗口
- 当窗口开始时间和计算出来该时间窗口时间一样,则直接返回该窗口
- 当该时间的窗口时间大于获取出来下标的时间,则重置该窗口。
- 如果当前时间小于该时间,则返回一个新的,(这段代码永远跑不到,是否跑一场更优雅,提了一个issue给sentinelissue:LeapArray.currentWindow is less elegant than returning a new WindowWrap when the time is less than the time window obtained.
代码如下:
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
//计算当前时间的时间窗口的位置
int idx = calculateTimeIdx(timeMillis);
//计算当前时间窗口的开始时间
long windowStart = calculateWindowStart(timeMillis);
while (true) {
//取该下表对应的时间窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
//不存在,则创建一个新的
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket());
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
//如果失败,则代表有其他的线程再创建,放弃时间片
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
如果是这个窗口的开始时间,则直接返回
return old;
} else if (windowStart > old.windowStart()) {
//如果当前时间的窗口开始时间>老的时间窗口,则重置该时间窗口时间
// 防止并发,加重入锁
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
//失败则代表锁已经被其他线程占用
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket());
}
}
}
而在StatisticNode
节点中,实质也是使用LeapArray
来存储,从LeapArray中获取MetricBucket
,对QPS,请求线程数,rt时间等坐记录。
再来看一下StatisticNode
的定义:
public class StatisticNode implements Node {
//每秒的滚动计数器 SAMPLE_COUNT为2对应LeapArray中的sample count,IntervalProperty.INTERVAL为1000代表1s,1s分为两个桶,保存数据。
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
//每分钟的滚动计数器1分钟分为60个记录,1分钟一个。
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
//当前线程数
private AtomicInteger curThreadNum = new AtomicInteger(0);
//最后一次metrics被获取的时间
private long lastFetchTime = -1;
}
所以,在添加rt时间,qps,BlockQps等实质都是使用LeapArray的当前窗口去做添加
。
//StatisticNode.java
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
@Override
public void addRtAndSuccess(long rt, int successCount) {
rollingCounterInSecond.addSuccess(successCount);
rollingCounterInSecond.addRT(rt);
rollingCounterInMinute.addSuccess(successCount);
rollingCounterInMinute.addRT(rt);
}
@Override
public void increaseBlockQps(int count) {
rollingCounterInSecond.addBlock(count);
rollingCounterInMinute.addBlock(count);
}
@Override
public void increaseExceptionQps(int count) {
rollingCounterInSecond.addException(count);
rollingCounterInMinute.addException(count);
}
@Override
public void addBlock(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addBlock(count);
}
@Override
public void addSuccess(int count) {
//当前窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addSuccess(count);
}
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
@Override
public void addRT(long rt) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addRT(rt);
}
https://www.jianshu.com/p/6ee4b7bdb844 这篇博客对滑动窗口讲的比较细,可以看看。