sentinel

sentinel-限流篇

2019-08-01  本文已影响0人  Audience0

官方文档:https://github.com/alibaba/Sentinel/wiki/%E4%B8%BB%E9%A1%B5
开发五步骤:
1.依赖

<!--sentinel核心包,从1.5.0开始仅支持JDK1.7以上版本,1.5.0之前的版本最低支持JDK1.6-->
       <dependency>
           <groupId>com.alibaba.csp</groupId>
           <artifactId>sentinel-core</artifactId>
           <version>1.6.2</version>
       </dependency>

       <!--sentinel Annotation AspectJ  扩展-->
       <dependency>
           <groupId>com.alibaba.csp</groupId>
           <artifactId>sentinel-annotation-aspectj</artifactId>
           <version>1.6.2</version>
       </dependency>
       <!--客户端需要引入 Transport 模块来与 Sentinel 控制台进行通信-->
       <dependency>
           <groupId>com.alibaba.csp</groupId>
           <artifactId>sentinel-transport-simple-http</artifactId>
           <version>1.6.2</version>
       </dependency>

2.XML 配置

<!--sentinel 切面-->
<bean id="sentinelResourceAspect" class="com.alibaba.csp.sentinel.annotation.aspectj.SentinelResourceAspect"></bean>
<!--使sentinel中的 aspectJ注解生效-->
<aop:aspectj-autoproxy proxy-target-class="false"/><!--声明自动为spring容器中那些配置@aspectJ切面的bean创建代理,织入切面,true使用CGLib动态代理技术织入增强,false表示使用jdk动态代理织入增强-->

3.定义资源

@SentinelResource(value = "annotationHandlerMethod" , blockHandler = "annotationHandlerMethod",blockHandlerClass = HandlerFallUtils.class)

4.定义规则

FlowRuleManager.loadRules(Arrays.asList(new FlowRule("annotationHandlerMethod")));

5.定义降级方法

/**
     * 需要public static 参数需要多一个BlockException
     * @param i
     * @param d
     * @return
     */
    public static String annotationHandlerMethod(int i,BlockException d){
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i + "=execute annotationHandlerMethod--fall:"+d.getMessage();
    

限流方式(流控控制效果)
1.直接拒绝
默然是直接拒绝,当QPS超过任意规则的阈值后,新的请求就会被立即拒绝,抛出FlowException.
2.Warm up

FlowRule rule = new FlowRule();
rule.setResource(resourceName);
rule.setCount(20);
rule.setGrade(RuleConstant.GRADE_QPS);
rule.setLimitApp("default");
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);
//warm up方式需要制定,表示经过WarmUpPeriodSec 秒的时间将使QPS达到设定的count值
rule.setWarmUpPeriodSec(10);
图片.png

3.均匀排队

rule.setGrade(RuleConstant.GRADE_QPS);
/ /流控效果:匀速排队模式
rule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER); 
//表示一秒内均匀通过10个请求
rule.setCount(10);
rule.setMaxQueueingTimeMs(20 * 1000); // 最长排队等待时间:20s

以固定的间隔时间让请求通过。当请求到来的时候,如果当前请求距离上个通过的请求通过的时间间隔不小于预设值,则让当前请求通过;否则,计算当前请求的预期通过时间,如果该请求的预期通过时间小于规则预设的 timeout 时间,则该请求会等待直到预设时间到来通过;反之,则马上抛出阻塞异常。


限流过程
1.规则刷新 --> FlowRuleManager.loadRules
涉及FlowRuleManager,FlowRuleUtil,DynamicSentinelProperty

public class FlowRuleManager {
    //规则存放Map
   private static final Map<String, List<FlowRule>> flowRules = new ConcurrentHashMap<String, List<FlowRule>>();

    private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
    private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();
  
    static {
        currentProperty.addListener(LISTENER);
        SCHEDULER.scheduleAtFixedRate(new MetricTimerListener(), 0, 1, TimeUnit.SECONDS);
    }
  //更新规则
  public static void loadRules(List<FlowRule> rules) {
        currentProperty.updateValue(rules);
    }
  //获取限流规则
  static Map<String, List<FlowRule>> getFlowRuleMap() {
        return flowRules;
    }
  //规则加载主要逻辑
  private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {

        @Override
        //更新规则
        public void configUpdate(List<FlowRule> value) {
            //校验FlowRule
            //赋值rule.setRater(TrafficShapingController) 限流处理类
            //按key 资源名称,value 规则列表封装Map返回
            Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
            if (rules != null) {
                //更新规则列表前,清楚原列表所有规则
                flowRules.clear();
                flowRules.putAll(rules);
            }
            RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
        }

        @Override
        //初始化时执行
        public void configLoad(List<FlowRule> conf) {
            Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
            if (rules != null) {
                flowRules.clear();
                flowRules.putAll(rules);
            }
            RecordLog.info("[FlowRuleManager] Flow rules loaded: " + flowRules);
        }
    }
}
  

2.处理资源请求 -->SentinelResourceAspect

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        //获取当前资源方法
        Method originMethod = resolveMethod(pjp);
        //获取资源注解配置
        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            // Should not go through here.
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }
        //获取资源名
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        Entry entry = null;
        try {
            //如果限流则抛出 FlowException 异常
            entry = SphU.entry(resourceName, entryType, 1, pjp.getArgs());
            //调用资源方法
            Object result = pjp.proceed();
            return result;
        } catch (BlockException ex) {
            //降级的handlerBlock方法
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            //获取配置的忽略异常,即改异常不会被降级,会直接抛出
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            //判断当前异常是否为exceptionsToIgnore中的异常或exceptionsToIgnore异常的子类,是的话直接抛出
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            // annotation.exceptionsToTrace()  默认值Throwable.class
            // 非exceptionsToIgnore中,但在exceptionsToTrace中的异常才会进行fallback降级
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                //统计异常信息
                traceException(ex, annotation);
                //fallback方法
                return handleFallback(pjp, annotation, ex);
            }

            // No fallback function can handle the exception, so throw it out.
            throw ex;
        } finally {
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }
}

handlerblock方法 和 fallback方法调用实现如下:

public abstract class AbstractSentinelAspectSupport {

  /**
     * 1.blockHandlerMethod 方法 必须更加一个 BlockException类型的参数
     * 2.根据是否为静态方法来区分调用的是本类的降级方法,还是指定类的,所以指定类的降级方法必须要是static类型
     * 3.如果未配置blockHandlerMethod方法,兜底fallback方法
     */
    protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)
        throws Throwable {

        // Execute block handler if configured.
        //获取降级方法
        Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),
            annotation.blockHandlerClass());
        if (blockHandlerMethod != null) {
            Object[] originArgs = pjp.getArgs();
            // Construct args.
            Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
            //所以handler方法必须加上BlockException 参数
            args[args.length - 1] = ex;
            //是否是静态的
            if (isStatic(blockHandlerMethod)) {
                return blockHandlerMethod.invoke(null, args);
            }
            return blockHandlerMethod.invoke(pjp.getTarget(), args);
        }

        // If no block handler is present, then go to fallback.
        return handleFallback(pjp, annotation, ex);
    }
       /**
     * 1.Fallback 方法 可以增加一个 Throwable类型的参数,也可以不加
     * 2.根据是否为静态方法来区分调用的是本类的降级方法,还是指定类的,所以指定类的降级方法必须要是static类型
     * 3.如果未配置fallback方法,兜底defaultFallback方法
     */
    protected Object handleFallback(ProceedingJoinPoint pjp, String fallback, String defaultFallback,
                                    Class<?>[] fallbackClass, Throwable ex) throws Throwable {
        //资源方法参数
        Object[] originArgs = pjp.getArgs();

        // 获取配置的fallback方法
        Method fallbackMethod = extractFallbackMethod(pjp, fallback, fallbackClass);
        if (fallbackMethod != null) {
            // 降级方法参数
            int paramCount = fallbackMethod.getParameterTypes().length;
            Object[] args;

            if (paramCount == originArgs.length) {
                //降级方法参数 和 资源方法参数长度一致
                args = originArgs;
            } else {
                //降级方法参数 和 资源方法参数长度不一致,则降级方法增加一个Throwable 类型的参数
                args = Arrays.copyOf(originArgs, originArgs.length + 1);
                args[args.length - 1] = ex;
            }
            //是否是静态方法
            if (isStatic(fallbackMethod)) {
                return fallbackMethod.invoke(null, args);
            }
            return fallbackMethod.invoke(pjp.getTarget(), args);
        }
        // 如果fallback方法未配置,则尝试调用defaultFallback方法,如果配置了的话
        return handleDefaultFallback(pjp, defaultFallback, fallbackClass, ex);
    }
    /**
     * 1.defaultFallback 方法  参数为  无参数 或一个Throwable的参数
     * 2.根据是否为静态方法来区分调用的是本类的降级方法,还是指定类的,所以指定类的降级方法必须要是static类型
     * 3.如果未配置defaultFallback方法,则抛出异常
     */
    protected Object handleDefaultFallback(ProceedingJoinPoint pjp, String defaultFallback,
                                           Class<?>[] fallbackClass, Throwable ex) throws Throwable {
        // Execute the default fallback function if configured.
        Method fallbackMethod = extractDefaultFallbackMethod(pjp, defaultFallback, fallbackClass);
        if (fallbackMethod != null) {
            // Construct args.
            Object[] args = fallbackMethod.getParameterTypes().length == 0 ? new Object[0] : new Object[] {ex};
            if (isStatic(fallbackMethod)) {
                return fallbackMethod.invoke(null, args);
            }
            return fallbackMethod.invoke(pjp.getTarget(), args);
        }

        // If no any fallback is present, then directly throw the exception.
        throw ex;
    }
}

3.限流过程 StatisticSlot
StatisticSlot 系统流程--判断是否降级,数据统计

public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // 去校验限流,系统保护,黑白名单,熔断
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // 校验通过 统计数据  DefaultNode
            node.increaseThreadNum();
            node.addPassRequest(count);
            //只有配置了指定context的来源时才会StatisticNode不为null
            if (context.getCurEntry().getOriginNode() != null) {
                // 统计数据  StatisticNode
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }
            //EntryType.IN 入口流量
            if (resourceWrapper.getType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected error, set error to current entry.
            context.getCurEntry().setError(e);

            // This should not happen.
            node.increaseExceptionQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseExceptionQps(count);
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseExceptionQps(count);
            }
            throw e;
        }
    }
}

4.限流判断 FlowSlot ,FlowRuleChecker
校验过程
1)获取该资源配置的规则

public class FlowRuleChecker {
  //判断是否限流。如果限流抛出FlowException异常
  public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        //获取对应资源的规则,通过 FlowRuleManager.getFlowRuleMap()获取
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            //遍历每条规则,所以,每个资源可以配置多个规则,进行多次校验,有一个规则不通过,则进行限流
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    //限流抛出的异常未FlowException
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }
}

2)未做集群配置,则进行本地限流校验

public class FlowRuleChecker {
  //本地限流校验,prioritized :false ,acquireCount:1
  private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {

        //选取节点
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }
        //FlowRuleUtil.buildFlowRuleMap   rule.getRater() 赋值   DefaultController
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
}

3)进行限流校验

public class DefaultController implements TrafficShapingController {
    
     public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        //QPS  or ThreadNum
        //获取当前时间窗体调用次数
        int curCount = avgUsedTokens(node);
        //count  规则配置QPS,acquireCount 本次调用量 1次,curCount当前时间窗体里已经有的调用次数
        if (curCount + acquireCount > count) {
            //prioritized  写死为false
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                 ......
                }
            }
            return false;
        }
        return true;
    }
    //获取当前时间窗体调用次数
    private int avgUsedTokens(Node node) {
        if (node == null) {
            return DEFAULT_AVG_USED_TOKENS;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
    }
}

4)获取当前秒的调用次数 StatisticNode,ArrayMetric,LeapArray

public class StatisticNode implements Node {
      //SAMPLE_COUNT 2   INTERVAL 1000MS
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    @Override
    //获取当前秒调用次数
    public double passQps() {
        return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
    }
}
public class ArrayMetric implements Metric {
    //统计数据
    private final LeapArray<MetricBucket> data;
    //创建ArrayMetric 赋值data   OccupiableBucketLeapArray类型
    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    //当前秒 调用次数
    public long pass() {
        //获取当前时间段
        data.currentWindow();
        long pass = 0;
        List<MetricBucket> list = data.values();
        for (MetricBucket window : list) {
            //计算总调用次数
            pass += window.pass();
        }
        return pass;
    }

    //1000ms 转换为1s
    public double getWindowIntervalInSec() {
        return data.getIntervalInSecond();
    }
}
public abstract class LeapArray<T> {
     //时间段长度
    protected int windowLengthInMs; 
    //intervalInMs 时间段个数
    protected int sampleCount;
    protected int intervalInMs;
    //存储窗体数据
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    
    public LeapArray(int sampleCount, int intervalInMs) {
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
    //根据当前时间获取当前秒的时间窗体数据
    public WindowWrap<T> currentWindow(long timeMillis) {//timeMillis 当前时间
        if (timeMillis < 0) {
            return null;
        }
        //计算时间段数组的下标
        int idx = calculateTimeIdx(timeMillis);
        //获取当前时间段的开始时间
        long windowStart = calculateWindowStart(timeMillis);
        while (true) {
            //从数组中获取时间窗体
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                //当前时间窗不存在,根据时间长度,开始时间初始化窗体,并赋值到数据idx处
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // 更新窗体并返回
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                //当前时间窗体开始时间和当前时间计算的开始时间一致,则返回该窗体
                return old;
            } else if (windowStart > old.windowStart()) {
                //当当前时间计算的窗体大于该窗体,则
                if (updateLock.tryLock()) {
                    try {
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {//不存在的情况
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
}

Context 上线文对象的初始化

1.Context的创建

// 自动创建
context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
 //手动创建
ContextUtil.enter(String name , String origin)

实际是调用以下方法创建context

public class ContextUtil {
    protected static Context trueEnter(String name, String origin) {
        Context context = contextHolder.get();//contextHolder  ThreadLocal
        if (context == null) {
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) {
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
                    try {
                        LOCK.lock();
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // Add entrance node.
                                Constants.ROOT.addChild(node);

                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock();
                    }
                }
            }
            //conext 创建,初始化context APPName, EntranceNode入口节点
            context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        }
        return context;
    }
}

2.context 的Entry初始化

   Entry e = new CtEntry(resourceWrapper, chain, context);

context 的entry初始化

class CtEntry extends Entry {
     CtEntry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) {
        super(resourceWrapper);
        this.chain = chain;
        this.context = context;

        setUpEntryFor(context);
    }
    //context 的entry初始化
    private void setUpEntryFor(Context context) {
        // The entry should not be associated to NullContext.
        if (context instanceof NullContext) {
            return;
        }
        this.parent = context.getCurEntry();
        if (parent != null) {
            ((CtEntry)parent).child = this;
        }
        context.setCurEntry(this);
    }
}

ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);初始化

public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        //Context的DefaultNode初始化
        chain.addLast(new NodeSelectorSlot());
        //context中的Entry的CurNode中挂的clusterNode 赋值,originNode赋值
        chain.addLast(new ClusterBuilderSlot());
        //BlockException异常日志
        chain.addLast(new LogSlot());
        //流程
        chain.addLast(new StatisticSlot());
        //系统保护
        chain.addLast(new SystemSlot());
        //黑白名单
        chain.addLast(new AuthoritySlot());
        //限流
        chain.addLast(new FlowSlot());
        //熔断
        chain.addLast(new DegradeSlot());
        return chain;
    }

chain初始化结构如下: 每一个资源对应一个chainSlot


image.png

接下来,会依次执行next节点类的entry方法.
1)NodeSelectorSlot : Context的DefaultNode初始化
以context name 为key 保存DefaultNode,即DefaultNode保存着某个resource在某个context中的实时指标,每个DefaultNode都指向一个ClusterNode

public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
       
        //以context name 为key  保存DefaultNode
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                }
                // Build invocation tree
                ((DefaultNode)context.getLastNode()).addChild(node);
            }
        }
        //初始化context 中Entry 的curNode值 DefaultNode
        context.setCurNode(node);
        //下一个entry
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

2)ClusterBuilderSlot :context中的Entry的CurNode中挂的clusterNode 赋值,originNode赋值
ClusterNode 以资源来key保存在map中.
ClusterNode:保存着某个resource在所有的context中实时指标的总和,同样的resource会共享同一个ClusterNode,不管他在哪个context中

public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    //同样的资源获取同样的clusterNode  ResourceWrapper 重写hashCode equal方法,所以资源名一致则表示同一个资源
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // Create the cluster node.
                    clusterNode = new ClusterNode();
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    // DefaultNode的资源为map的key
                    newMap.put(node.getId(), clusterNode);
                    clusterNodeMap = newMap;
                }
            }
        }
        //context中的Entry的CurNode中挂的clusterNode 赋值
        node.setClusterNode(clusterNode);
        if (!"".equals(context.getOrigin())) {
            //通过origin创建 OriginNode即StatisticNode,以origin为key存储在map中
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }
        //下一个entry方法
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

至此Context初始化完成

上一篇 下一篇

猜你喜欢

热点阅读