Spring Cloud Alibaba——Sentinel S
slot概述
在 Sentinel 里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个 Entry 对象。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 SphU API 显式创建。Entry 创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:
-
NodeSelectorSlot:负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级。
-
ClusterBuilderSlot:则用于存储资源的统计信息以及调用者信息,例如该资源的 RT、QPS、thread count 等等,这些信息将用作为多维度限流,降级的依据。
-
LogSlot:则用于记录用于记录块异常,为故障排除提供具体的日志。
-
StatisticSlot:则用于记录、统计不同纬度的 runtime 指标监控信息。
-
AuthoritySlot:则根据配置的黑白名单和调用来源信息,来做黑白名单控制。
-
SystemSlot:则通过系统的状态,例如 load1 等,来控制总的入口流量。
-
FlowSlot:则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制。
-
DegradeSlot:则通过统计信息以及预设的规则,来做熔断降级。
下面是关系结构图:

solt的基本逻辑及代码演示
每个Slot执行完业务逻辑处理后,会调用fireEntry()方法,该方法将会触发下一个节点的entry方法,下一个节点又会调用他的fireEntry,以此类推直到最后一个Slot,由此就形成了sentinel的责任链。
工作流概述:

根据slot 的基本实现processorSlot的实现类讲一下slot 的基本结构
先看看顶层接口ProcessorSlot
public interface ProcessorSlot<T> {
//开始入口
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
Object... args) throws Throwable;
//finish意味着结束
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
Object... args) throws Throwable;
//退出插槽
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
//退出插槽结束
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
这个接口有4个方法,entry,fireEntry,exit,fireExit
ProcessorSlot 的抽象实现 AbstractLinkedProcessorSlot
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
//当业务执行完毕后,如果还有下一个slot
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@SuppressWarnings("unchecked")
//指向下一个slot的entry,每一个slot根据自己的职责不同,有自己的实现
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
//当一个slot的exit执行完毕后,如果还有下一个未关闭slot
if (next != null) {
//指向下一个slot的exit
next.exit(context, resourceWrapper, count, args);
}
}
public AbstractLinkedProcessorSlot<?> getNext() {
return next;
}
public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.next = next;
}
}
DefaultProcessorSlotChain实现了上述的chain(setNext和getNext)
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
//直接实现了AbstractLinkedProcessorSlot的实例并作为first,可以理解为当前slot
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
//默认的end(可以理解为当前的后一个slot)
AbstractLinkedProcessorSlot<?> end;
public DefaultProcessorSlotChain() {
this.end = this.first;
}
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(this.first.getNext());
this.first.setNext(protocolProcessor);
//如果当前为最后一个
if (this.end == this.first) {
this.end = protocolProcessor;
}
}
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
//将后一个slot放进当前slot的next
this.end.setNext(protocolProcessor);
//将end指向后一个slot
this.end = protocolProcessor;
}
public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.addLast(next);
}
public AbstractLinkedProcessorSlot<?> getNext() {
return this.first.getNext();
}
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
this.first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
this.first.exit(context, resourceWrapper, count, args);
}
}
NodeSelectorSlot
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
/**
* 相同的资源但是Context不同,分别新建 DefaultNode,并以ContextName为key
*/
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 根据ContextName尝试获取DefaultNode
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
// 初始化resource对应的Node 类型为DefaultNode,每个Context对应一个
node = new DefaultNode(resourceWrapper, null);
//保存Node 一个resource对应一个Node
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// 构建 Node tree
// 添加到Context的Node节点,这里构造了一棵节点树
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
//设置当前Context的当前节点为node
context.setCurNode(node);
// 唤醒执行下一个插槽,调用下一个Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
NodeSelectorSlot顾名思义是用来构建Node的。
我们可以看到NodeSelectorSlot对于不同的上下文都会生成一个DefaultNode。这里还有一个要注意的点:相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中,因此不同的上下文可以进入到同一个对象的NodeSelectorSlot.entry方法中,那么这里要怎么区分不同的上下文所创建的资源Node呢?显然可以使用上下文名称作为映射键以区分相同的资源Node。
然后这里要考虑另一个问题。一个资源有可能创建多个DefaultNode(有多个上下文时),那么我们应该如何快速的获取总的统计数据呢?
答案就在下一个Slot(ClusterBuilderSlot)中被解决了。
ClusterBuilderSlot
上面有提到一个问题,我们要如何统计不同上下文相同资源的总量数据。ClusterBuilderSlot给了很好的解决方案:具有相同资源名称的共享一个ClusterNode。
@Spi(isSingleton = false, order = Constants.ORDER_CLUSTER_BUILDER_SLOT)
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
// 相同的资源共享一个 ClusterNode
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
private static final Object lock = new Object();
private volatile ClusterNode clusterNode = null;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
// 判断本资源是否已经初始化过clusterNode
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// 没有初始化则初始化clusterNode
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
// 给相同资源的DefaultNode设置一样的ClusterNode
node.setClusterNode(clusterNode);
/*
* 如果有来源则新建一个来源Node
*/
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
上面的代码其实就是做了一件事情,为资源创建CluserNode,相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中。也就是说,能进入到同一个ClusterBuilderSlot对象的entry方法的请求都是来自同一个资源的,所以这些相同资源需要初始化一个统一的CluserNode用来做流量的汇总统计。
LogSlot
代码比较简单,逻辑就是打印异常日志,就不分析了。
StatisticSlot
StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据。
@Spi(order = Constants.ORDER_STATISTIC_SLOT)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
//next(下一个)节点调用Entry方法
//先进行后续的check,包括规则的check,黑白名单check
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级
// 统计默认qps 线程数, 当前线程数加1
node.increaseThreadNum();
//通过的请求加上count
node.addPassRequest(count);
// 元节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1
if (context.getCurEntry().getOriginNode() != null) {
// 根据来源统计qps 线程数
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
// 入口节点通过请求数和当前线程(LongAdder curThreadNum)计数器加1
if (resourceWrapper.getEntryType() == EntryType.IN) {
// 统计入口 qps 线程数
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
// 注册的扩展点的数据统计
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
}
StatisticSlot主要做了4种不同维度的流量统计
- 1、资源在上下文维度(DefaultNode)的统计。
- 2、clusterNode 维度的统计。
- 3、Origin 来源维度的统计。
- 4、入口全局流量的统计。
SystemSlot
SystemSlot比较简单,其实就是根据StatisticSlot所统计的全局入口流量进行限流。
AuthoritySlot
@Spi(order = Constants.ORDER_AUTHORITY_SLOT)
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
// 根据资源获取黑白名单规则
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
// 对规则进行校验,只要有一条不通过 就抛异常
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
}
AuthoritySlot会对资源的黑白名单做检查,并且只要有一条不通过就抛异常。
FlowSlot
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
// 检查限流规则
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
}
这个slot主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止,并且同样也会根据三种不同的维度来进行限流:
- 1、资源在上下文维度(DefaultNode)的统计。
- 2、clusterNode 维度的统计。
- 3、Origin 来源维度的统计。
DegradeSlot
这个slot主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。
总结
-
1、相同的资源({@link ResourceWrapper#equals(Object)})将全局共享相同的{@link ProcessorSlotChain},无论在哪个上下文中。
-
2、流控有多个维度,分别包括:
- 1、不同上下文中的资源。
- 2、相同资源。
- 3、入口流量。
- 4、相同的来源。
Sentinel中预设的SlotChain执行的完整流程:

参考:
https://www.cnblogs.com/taromilk/p/11751000.html