Sentinel之实时数据分析
Sentinel有一个重要的功能,即实时数据统计分析,我们可以获得在每1秒或者每1分钟下的每个上下文调用链路中的某一资源的请求数、阻塞数或响应时间;也可以获得某一资源全局的请求数、阻塞数或者响应时间。 主要实现逻辑是在StatisticSlot
中。
Statisticslot
处于调用链slotchain中的第三个,负责统计资源的实时状态,调用到slotchain中的任意一个slot时,都会触发该slot的entry方法。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable {
try {
// 触发下一个Slot的entry方法
fireEntry(context, resourceWrapper, node, count, args);
// 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
// 统计信息
node.increaseThreadNum();
node.addPassRequest();
// 省略部分代码
} catch (BlockException e) {
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockedQps();
// 省略部分代码
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
// Should not happen
node.increaseExceptionQps();
// 省略部分代码
throw e;
}
}
entry()
主要有三个部分:
1) 首先会触发后续slot的entry方法,如SystemSlot、FlowSlot、DegradeSlot等的规则。
2)当后续的slot通过,没有抛出BlockException异常,说明该资源被成功调用,则增加执行线程数和通过的请求数。
3)当后续的slot中某一没有通过,则会抛出BlockException等异常,如果捕获的是BlockException异常,则主要是增加阻塞的数量;如果是系统异常,则增加异常数量。
当退出的时候会执行exit()
方法:
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
//计算响应时间,通过当前时间-CurEntry的创建时间取毫秒值
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
//新增响应时间和成功数
node.addRtAndSuccess(rt, count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().addRtAndSuccess(rt, count);
}
//线程数减1
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
//全局线程数-1
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.addRtAndSuccess(rt, count);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
***其他逻辑***
fireExit(context, resourceWrapper, count);
}
当退出时,重点关注响应时间,将本次响应时间收集到Node中,并将当前活跃线程数减1。
整体流程如上所述,但是具体的操作我们还不清楚,接下来我将分析其中的Qps数是如何统计的。
在上述的entry()
方法中在统计Qps数量时会调用node.addPassRequest();
方法。
@Override
public void addPassRequest(int count) {
# DefaultNode类型
# 统计某个resource在某个context中的实时指标
super.addPassRequest(count);
# ClusterNode类型
# 统计某个resource在所有的context中实时指标总和
this.clusterNode.addPassRequest(count);
}
这两个Node都是StatisticNode
的子类,最终会调用StatisticNode
中的方法。
@Override
public void addPassRequest(int count) {
# 秒级统计
rollingCounterInSecond.addPass(count);
# 分钟统计
rollingCounterInMinute.addPass(count);
}
秒级统计和分钟统计的底层原理都是一样的,下面将对秒级统计进行分析。
更正,在秒级统计中实际是OccupiableBucketLeapArray而不是BucketLeapArray
为了好理解,假设秒级用BucketLeapArray,而实际上不是
对于分钟级来说,一种有60个窗口,每个窗口是1s
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
在上面代码中,有几个重要的类。ArrayMetric
、BucketLeapArray
、MetricBucket
、WindowWrap
。
WindowWrap
每一个滑动窗口的包装类,其内部的数据结构T是用MetricBucket表示的。
public class WindowWrap<T> {
//一个窗口时段的时间长度(以毫秒为单位)
private final long windowLengthInMs;
//窗口的开始时间戳(以毫秒为单位)
private long windowStart;
//统计数据,MetricBucket
private T value;
MetricBucket
表示一段时间内的指标数据,存放在LongAdder
类型的数组里。有通过数量、阻塞数量、异常数量、成功数量、响应时间、已通过未来配额。相对于AtomicLong
,LongAddr
在高并发下有更好的吞吐量,代价是花费了更多的空间。
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt;
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
}
public enum MetricEvent {
PASS,
BLOCK,
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS
}
LeapArray
Sentinel中统计指标的基本数据结构。
public LeapArray(int sampleCount, int intervalInMs) {
# 时间窗口的长度
this.windowLengthInMs = intervalInMs / sampleCount;
# 以毫秒为单位的时间间隔,
this.intervalInMs = intervalInMs;
# 采样窗口的个数,即数组长度
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
在按秒统计时,默认的时间窗口数组长度为2,每个时间窗口的长度为500ms。
在统计QPS时,第一步是调用data.currentWindow()
,获取当前时间窗口。
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
Qps添加第一大步
下面对currentTimeMills()
方法进行拆开分析。
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
# 计算给定的时间映射在数组中的下标(默认数组长度为2)
# 则idx可以是0或者1
int idx = calculateTimeIdx(timeMillis);
# 根据当前时间计算出所在窗口应该对用的开始时间
long windowStart = calculateWindowStart(timeMillis);
private int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
为什么默认要用两个采样窗口,因为sentinel设定的是比较轻量的框架。时间窗口保存着很多统计数据,如果时间窗口过多的话,一方面会占用过多的内存,另一方面时间窗口过多意味着时间窗口的长度会变小,如果时间窗口长度变小,就会导致时间窗口过于频繁的滑动。
while (true) {
# 获取存储的该索引位置下的旧的时间窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
# 没有则创建一个
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
# 通过CAS进行设置
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
//否则当前线程让出时间片,再进行线程竞争
Thread.yield();
}
# 如果实际应当的开始时间和原来的窗口的开始时间相等,则说明没有失效,直接返回
} else if (windowStart == old.windowStart()) {
return old;
# 让应当的开始时间大于原来old窗口的开始时间,则说明该窗口失效
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
# 将旧的时间窗口的开始时间设置为实际应该的开始时间,
# 并重置该窗口的统计数据为0
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
# 这种情况不可能存在,会抛出异常
} else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
# w.value() 即 MetricBucket
w.value().reset();
return w;
}
#重新设置它的开始时间
public WindowWrap<T> resetTo(long startTime) {
this.windowStart = startTime;
return this;
}
# 将MetricBucket的统计数据都重置为0
public void reset() {
internalReset(0L);
}
Qps添加第二大步
至此,第一大步已经介绍完了,下面是第二大步wrap.value().addPass(count)
。
这一步很简单,就是在第一步后会获得所处的时间窗口WindowWrap
,然后得到该类里面的MetricBucket
,它统计了该事件窗口下的数据统计,最后进行原子增加操作。
private T value;
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
public T value() {
return value;
}
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
以上就是增加Qps的整体流程。
Qps数据获取
那我们将数据添加上了,那怎么查询获得呢?
经过学习了解后,我们可以知道资源的数据统计存放在DefaultNode
和ClsterNode
中,它们都是StatisticNode
的子类,StatisticNode
实现了NOde
接口的很多关于统计数据的方法,其中有统计Qps的方法。
@Override
public double passQps() {
# 先获取现在的时间窗口数组的Qps总量 @(1)
# 然后获取时间 @(2)
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
代码@(1)解析
@Override
public long pass() {
# 与前面方法一致,过滤掉过期窗口
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
public List<T> values() {
return values(TimeUtil.currentTimeMillis());
}
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
# 即 MetricBucket
result.add(windowWrap.value());
}
return result;
}
当前时间减去某一窗口的开始时间,超过了事件间隔(按秒统计的话,就是1s),就说明该窗口过期,不添加。
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
代码@(2)解析
因为之前的时间单位是毫秒,现在计算的是每秒,所以转化为秒。
@Override
public double getWindowIntervalInSec() {
return data.getIntervalInSecond();
}
public double getIntervalInSecond() {
return intervalInMs / 1000.0;
}
至此,关于实时统计的模块就讲完了,大部分是参考几个大神的文章,图文并茂,很好理解,大家可以阅读如下:
Sentinel 原理-滑动窗口
Alibaba Seninel 滑动窗口实现原理(文末附原理图)
源码分析 Sentinel 实时数据采集实现原理