sentinel-限流篇
官方文档:https://github.com/alibaba/Sentinel/wiki/%E4%B8%BB%E9%A1%B5
开发五步骤:
1.依赖
<!--sentinel核心包,从1.5.0开始仅支持JDK1.7以上版本,1.5.0之前的版本最低支持JDK1.6-->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.6.2</version>
</dependency>
<!--sentinel Annotation AspectJ 扩展-->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-annotation-aspectj</artifactId>
<version>1.6.2</version>
</dependency>
<!--客户端需要引入 Transport 模块来与 Sentinel 控制台进行通信-->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
<version>1.6.2</version>
</dependency>
2.XML 配置
<!--sentinel 切面-->
<bean id="sentinelResourceAspect" class="com.alibaba.csp.sentinel.annotation.aspectj.SentinelResourceAspect"></bean>
<!--使sentinel中的 aspectJ注解生效-->
<aop:aspectj-autoproxy proxy-target-class="false"/><!--声明自动为spring容器中那些配置@aspectJ切面的bean创建代理,织入切面,true使用CGLib动态代理技术织入增强,false表示使用jdk动态代理织入增强-->
3.定义资源
@SentinelResource(value = "annotationHandlerMethod" , blockHandler = "annotationHandlerMethod",blockHandlerClass = HandlerFallUtils.class)
4.定义规则
FlowRuleManager.loadRules(Arrays.asList(new FlowRule("annotationHandlerMethod")));
5.定义降级方法
/**
* 需要public static 参数需要多一个BlockException
* @param i
* @param d
* @return
*/
public static String annotationHandlerMethod(int i,BlockException d){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i + "=execute annotationHandlerMethod--fall:"+d.getMessage();
限流方式(流控控制效果)
1.直接拒绝
默然是直接拒绝,当QPS超过任意规则的阈值后,新的请求就会被立即拒绝,抛出FlowException.
2.Warm up
FlowRule rule = new FlowRule();
rule.setResource(resourceName);
rule.setCount(20);
rule.setGrade(RuleConstant.GRADE_QPS);
rule.setLimitApp("default");
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);
//warm up方式需要制定,表示经过WarmUpPeriodSec 秒的时间将使QPS达到设定的count值
rule.setWarmUpPeriodSec(10);
图片.png
3.均匀排队
rule.setGrade(RuleConstant.GRADE_QPS);
/ /流控效果:匀速排队模式
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
//表示一秒内均匀通过10个请求
rule.setCount(10);
rule.setMaxQueueingTimeMs(20 * 1000); // 最长排队等待时间:20s
以固定的间隔时间让请求通过。当请求到来的时候,如果当前请求距离上个通过的请求通过的时间间隔不小于预设值,则让当前请求通过;否则,计算当前请求的预期通过时间,如果该请求的预期通过时间小于规则预设的 timeout 时间,则该请求会等待直到预设时间到来通过;反之,则马上抛出阻塞异常。
限流过程
1.规则刷新 --> FlowRuleManager.loadRules
涉及FlowRuleManager,FlowRuleUtil,DynamicSentinelProperty
public class FlowRuleManager {
//规则存放Map
private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();
static {
currentProperty.addListener(LISTENER);
SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS);
}
//更新规则
public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}
//获取限流规则
static Map<String, List<FlowRule>> getFlowRuleMap() {
return flowRules;
}
//规则加载主要逻辑
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
//更新规则
public void configUpdate(List<FlowRule> value) {
//校验FlowRule
//赋值rule.setRater(TrafficShapingController) 限流处理类
//按key 资源名称,value 规则列表封装Map返回
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
//更新规则列表前,清楚原列表所有规则
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
@Override
//初始化时执行
public void configLoad(List<FlowRule> conf) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules loaded: " + flowRules);
}
}
}
2.处理资源请求 -->SentinelResourceAspect
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
//获取当前资源方法
Method originMethod = resolveMethod(pjp);
//获取资源注解配置
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
// Should not go through here.
throw new IllegalStateException("Wrong state for SentinelResource annotation");
}
//获取资源名
String resourceName = getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
Entry entry = null;
try {
//如果限流则抛出 FlowException 异常
entry = SphU.entry(resourceName, entryType, 1, pjp.getArgs());
//调用资源方法
Object result = pjp.proceed();
return result;
} catch (BlockException ex) {
//降级的handlerBlock方法
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
//获取配置的忽略异常,即改异常不会被降级,会直接抛出
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
//判断当前异常是否为exceptionsToIgnore中的异常或exceptionsToIgnore异常的子类,是的话直接抛出
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
throw ex;
}
// annotation.exceptionsToTrace() 默认值Throwable.class
// 非exceptionsToIgnore中,但在exceptionsToTrace中的异常才会进行fallback降级
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
//统计异常信息
traceException(ex, annotation);
//fallback方法
return handleFallback(pjp, annotation, ex);
}
// No fallback function can handle the exception, so throw it out.
throw ex;
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}
handlerblock方法 和 fallback方法调用实现如下:
public abstract class AbstractSentinelAspectSupport {
/**
* 1.blockHandlerMethod 方法 必须更加一个 BlockException类型的参数
* 2.根据是否为静态方法来区分调用的是本类的降级方法,还是指定类的,所以指定类的降级方法必须要是static类型
* 3.如果未配置blockHandlerMethod方法,兜底fallback方法
*/
protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)
throws Throwable {
// Execute block handler if configured.
//获取降级方法
Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),
annotation.blockHandlerClass());
if (blockHandlerMethod != null) {
Object[] originArgs = pjp.getArgs();
// Construct args.
Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
//所以handler方法必须加上BlockException 参数
args[args.length - 1] = ex;
//是否是静态的
if (isStatic(blockHandlerMethod)) {
return blockHandlerMethod.invoke(null, args);
}
return blockHandlerMethod.invoke(pjp.getTarget(), args);
}
// If no block handler is present, then go to fallback.
return handleFallback(pjp, annotation, ex);
}
/**
* 1.Fallback 方法 可以增加一个 Throwable类型的参数,也可以不加
* 2.根据是否为静态方法来区分调用的是本类的降级方法,还是指定类的,所以指定类的降级方法必须要是static类型
* 3.如果未配置fallback方法,兜底defaultFallback方法
*/
protected Object handleFallback(ProceedingJoinPoint pjp, String fallback, String defaultFallback,
Class<?>[] fallbackClass, Throwable ex) throws Throwable {
//资源方法参数
Object[] originArgs = pjp.getArgs();
// 获取配置的fallback方法
Method fallbackMethod = extractFallbackMethod(pjp, fallback, fallbackClass);
if (fallbackMethod != null) {
// 降级方法参数
int paramCount = fallbackMethod.getParameterTypes().length;
Object[] args;
if (paramCount == originArgs.length) {
//降级方法参数 和 资源方法参数长度一致
args = originArgs;
} else {
//降级方法参数 和 资源方法参数长度不一致,则降级方法增加一个Throwable 类型的参数
args = Arrays.copyOf(originArgs, originArgs.length + 1);
args[args.length - 1] = ex;
}
//是否是静态方法
if (isStatic(fallbackMethod)) {
return fallbackMethod.invoke(null, args);
}
return fallbackMethod.invoke(pjp.getTarget(), args);
}
// 如果fallback方法未配置,则尝试调用defaultFallback方法,如果配置了的话
return handleDefaultFallback(pjp, defaultFallback, fallbackClass, ex);
}
/**
* 1.defaultFallback 方法 参数为 无参数 或一个Throwable的参数
* 2.根据是否为静态方法来区分调用的是本类的降级方法,还是指定类的,所以指定类的降级方法必须要是static类型
* 3.如果未配置defaultFallback方法,则抛出异常
*/
protected Object handleDefaultFallback(ProceedingJoinPoint pjp, String defaultFallback,
Class<?>[] fallbackClass, Throwable ex) throws Throwable {
// Execute the default fallback function if configured.
Method fallbackMethod = extractDefaultFallbackMethod(pjp, defaultFallback, fallbackClass);
if (fallbackMethod != null) {
// Construct args.
Object[] args = fallbackMethod.getParameterTypes().length == 0 ? new Object[0] : new Object[] {ex};
if (isStatic(fallbackMethod)) {
return fallbackMethod.invoke(null, args);
}
return fallbackMethod.invoke(pjp.getTarget(), args);
}
// If no any fallback is present, then directly throw the exception.
throw ex;
}
}
3.限流过程 StatisticSlot
StatisticSlot 系统流程--判断是否降级,数据统计
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 去校验限流,系统保护,黑白名单,熔断
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 校验通过 统计数据 DefaultNode
node.increaseThreadNum();
node.addPassRequest(count);
//只有配置了指定context的来源时才会StatisticNode不为null
if (context.getCurEntry().getOriginNode() != null) {
// 统计数据 StatisticNode
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
//EntryType.IN 入口流量
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected error, set error to current entry.
context.getCurEntry().setError(e);
// This should not happen.
node.increaseExceptionQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}
throw e;
}
}
}
4.限流判断 FlowSlot ,FlowRuleChecker
校验过程
1)获取该资源配置的规则
public class FlowRuleChecker {
//判断是否限流。如果限流抛出FlowException异常
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//获取对应资源的规则,通过 FlowRuleManager.getFlowRuleMap()获取
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
//遍历每条规则,所以,每个资源可以配置多个规则,进行多次校验,有一个规则不通过,则进行限流
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
//限流抛出的异常未FlowException
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
}
2)未做集群配置,则进行本地限流校验
public class FlowRuleChecker {
//本地限流校验,prioritized :false ,acquireCount:1
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {
//选取节点
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//FlowRuleUtil.buildFlowRuleMap rule.getRater() 赋值 DefaultController
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
}
3)进行限流校验
public class DefaultController implements TrafficShapingController {
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//QPS or ThreadNum
//获取当前时间窗体调用次数
int curCount = avgUsedTokens(node);
//count 规则配置QPS,acquireCount 本次调用量 1次,curCount当前时间窗体里已经有的调用次数
if (curCount + acquireCount > count) {
//prioritized 写死为false
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
......
}
}
return false;
}
return true;
}
//获取当前时间窗体调用次数
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
}
4)获取当前秒的调用次数 StatisticNode,ArrayMetric,LeapArray
public class StatisticNode implements Node {
//SAMPLE_COUNT 2 INTERVAL 1000MS
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
@Override
//获取当前秒调用次数
public double passQps() {
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
}
public class ArrayMetric implements Metric {
//统计数据
private final LeapArray<MetricBucket> data;
//创建ArrayMetric 赋值data OccupiableBucketLeapArray类型
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
//当前秒 调用次数
public long pass() {
//获取当前时间段
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
//计算总调用次数
pass += window.pass();
}
return pass;
}
//1000ms 转换为1s
public double getWindowIntervalInSec() {
return data.getIntervalInSecond();
}
}
public abstract class LeapArray<T> {
//时间段长度
protected int windowLengthInMs;
//intervalInMs 时间段个数
protected int sampleCount;
protected int intervalInMs;
//存储窗体数据
protected final AtomicReferenceArray<WindowWrap<T>> array;
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
//根据当前时间获取当前秒的时间窗体数据
public WindowWrap<T> currentWindow(long timeMillis) {//timeMillis 当前时间
if (timeMillis < 0) {
return null;
}
//计算时间段数组的下标
int idx = calculateTimeIdx(timeMillis);
//获取当前时间段的开始时间
long windowStart = calculateWindowStart(timeMillis);
while (true) {
//从数组中获取时间窗体
WindowWrap<T> old = array.get(idx);
if (old == null) {
//当前时间窗不存在,根据时间长度,开始时间初始化窗体,并赋值到数据idx处
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// 更新窗体并返回
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
//当前时间窗体开始时间和当前时间计算的开始时间一致,则返回该窗体
return old;
} else if (windowStart > old.windowStart()) {
//当当前时间计算的窗体大于该窗体,则
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {//不存在的情况
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
}
Context 上线文对象的初始化
1.Context的创建
// 自动创建
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
//手动创建
ContextUtil.enter(String name , String origin)
实际是调用以下方法创建context
public class ContextUtil {
protected static Context trueEnter(String name, String origin) {
Context context = contextHolder.get();//contextHolder ThreadLocal
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
try {
LOCK.lock();
node = contextNameNodeMap.get(name);
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node.
Constants.ROOT.addChild(node);
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
//conext 创建,初始化context APPName, EntranceNode入口节点
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}
}
2.context 的Entry初始化
Entry e = new CtEntry(resourceWrapper, chain, context);
context 的entry初始化
class CtEntry extends Entry {
CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
super(resourceWrapper);
this.chain = chain;
this.context = context;
setUpEntryFor(context);
}
//context 的entry初始化
private void setUpEntryFor(Context context) {
// The entry should not be associated to NullContext.
if (context instanceof NullContext) {
return;
}
this.parent = context.getCurEntry();
if (parent != null) {
((CtEntry)parent).child = this;
}
context.setCurEntry(this);
}
}
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);初始化
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
//Context的DefaultNode初始化
chain.addLast(new NodeSelectorSlot());
//context中的Entry的CurNode中挂的clusterNode 赋值,originNode赋值
chain.addLast(new ClusterBuilderSlot());
//BlockException异常日志
chain.addLast(new LogSlot());
//流程
chain.addLast(new StatisticSlot());
//系统保护
chain.addLast(new SystemSlot());
//黑白名单
chain.addLast(new AuthoritySlot());
//限流
chain.addLast(new FlowSlot());
//熔断
chain.addLast(new DegradeSlot());
return chain;
}
chain初始化结构如下: 每一个资源对应一个chainSlot
image.png
接下来,会依次执行next节点类的entry方法.
1)NodeSelectorSlot : Context的DefaultNode初始化
以context name 为key 保存DefaultNode,即DefaultNode保存着某个resource在某个context中的实时指标,每个DefaultNode都指向一个ClusterNode
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
//以context name 为key 保存DefaultNode
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
}
// Build invocation tree
((DefaultNode)context.getLastNode()).addChild(node);
}
}
//初始化context 中Entry 的curNode值 DefaultNode
context.setCurNode(node);
//下一个entry
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
2)ClusterBuilderSlot :context中的Entry的CurNode中挂的clusterNode 赋值,originNode赋值
ClusterNode 以资源来key保存在map中.
ClusterNode:保存着某个resource在所有的context中实时指标的总和,同样的resource会共享同一个ClusterNode,不管他在哪个context中
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
//同样的资源获取同样的clusterNode ResourceWrapper 重写hashCode equal方法,所以资源名一致则表示同一个资源
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode();
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
// DefaultNode的资源为map的key
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
//context中的Entry的CurNode中挂的clusterNode 赋值
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
//通过origin创建 OriginNode即StatisticNode,以origin为key存储在map中
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
//下一个entry方法
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
至此Context初始化完成