Sentinel源码-入口类和SlotChain构建过程

2023-03-01  本文已影响0人  分布式与微服务

1. 测试用例

我们以sentinel-demo中的sentinel-annotation-spring-aop为例,分析sentinel的源码。核心代码如下:

DemoController:

@RestController
public class DemoController {

    @Autowired
    private TestService service;

    @GetMapping("/foo")
    public String apiFoo(@RequestParam(required = false) Long t) throws Exception {
        if (t == null) {
            t = System.currentTimeMillis();
        }
        service.test();
        return service.hello(t);
    }

    @GetMapping("/baz/{name}")
    public String apiBaz(@PathVariable("name") String name) {
        return service.helloAnother(name);
    }
}

TestServiceImpl:

@Service
public class TestServiceImpl implements TestService {

    @Override
    @SentinelResource(value = "test", blockHandler = "handleException", blockHandlerClass = {ExceptionUtil.class})
    public void test() {
        System.out.println("Test");
    }

    @Override
    @SentinelResource(value = "hello", fallback = "helloFallback")
    public String hello(long s) {
        if (s < 0) {
            throw new IllegalArgumentException("invalid arg");
        }
        return String.format("Hello at %d", s);
    }

    @Override
    @SentinelResource(value = "helloAnother", defaultFallback = "defaultFallback",
        exceptionsToIgnore = {IllegalStateException.class})
    public String helloAnother(String name) {
        if (name == null || "bad".equals(name)) {
            throw new IllegalArgumentException("oops");
        }
        if ("foo".equals(name)) {
            throw new IllegalStateException("oops");
        }
        return "Hello, " + name;
    }

    public String helloFallback(long s, Throwable ex) {
        // Do some log here.
        ex.printStackTrace();
        return "Oops, error occurred at " + s;
    }

    public String defaultFallback() {
        System.out.println("Go to default fallback");
        return "default_fallback";
    }
}

启动类DemoApplication

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

在启动这个工程上增加参数:

-Dcsp.sentinel.dashboard.server=localhost:8081 -Dproject.name=annotation-aspectj

如图:

打开http://localhost:8081/#/dashboard 地址,可以看到应用已经注册到sentinel管理后台:

1.1 流控测试

访问 http://localhost:19966/foo?t=188 这个链接,多访问几次,在实时监控页面可以看到:

然后,我们先简单配置一个流控规则,如下:

然后我们在快速刷新http://localhost:19966/foo?t=188 接口,会出现限流的情况,返回如下:

Oops, error occurred at 188

实时监控为:


2. 注解版源码分析

使用注解@SentinelResource 核心原理就是 利用AOP切入到方法中,我们直接看SentinelResourceAspect类,这是一个切面类:

@Aspect // 切面
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

    // 指定切入点为@SentinelResource 注解
    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

    // 环绕通知
    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        Method originMethod = resolveMethod(pjp);

        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        if (annotation == null) {
            // Should not go through here.
            throw new IllegalStateException("Wrong state for SentinelResource annotation");
        }
        String resourceName = getResourceName(annotation.value(), originMethod);
        EntryType entryType = annotation.entryType();
        int resourceType = annotation.resourceType();
        Entry entry = null;
        try {
            // 要织入的,增强的功能
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            // 调用目标方法
            return pjp.proceed();
        } catch (BlockException ex) {
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
            // The ignore list will be checked first.
            if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
                throw ex;
            }
            if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
                traceException(ex);
                return handleFallback(pjp, annotation, ex);
            }

            // No fallback function can handle the exception, so throw it out.
            throw ex;
        } finally {
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }
}

核心方法SphU.entry():

public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
    throws BlockException {
    // 注意 第4个参数值为 1
    return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
    throws BlockException {
    // count 参数:表示当前请求可以增加多少个计数
    // 注意 第5个参数为false
    return entryWithType(name, resourceType, entryType, count, false, args);
}
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
                           Object[] args) throws BlockException {
    // 将信息封装为一个资源对象
    StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
    // 返回一个资源操作对象entry
    // prioritized 为true 表示当前访问必须等待"根据其优先级计算出的时间"后才通过
    // prioritized 为 false 则当前请求无需等待
    return entryWithPriority(resource, count, prioritized, args);
}

我们重点看一下CtSph#entryWithPriority

/**
 * @param resourceWrapper
 * @param count 默认为1
 * @param prioritized 默认为false
 * @param args
 * @return
 * @throws BlockException
 */
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
    throws BlockException {
    // 从ThreadLocal中获取Context
    // 一个请求会占用一个线程,一个线程会绑定一个context
    Context context = ContextUtil.getContext();
    // 若context是 NullContext类型,则表示当前系统中的context数量已经超过阈值
    // 即访问的请求的数量已经超出了阈值,此时直接返回一个无需做规则检测的资源操作对象
    if (context instanceof NullContext) {
        // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
        // so here init the entry only. No rule checking will be done.
        return new CtEntry(resourceWrapper, null, context);
    }

    // 当前线程中没有绑定context,则创建一个context并将其放入到Threadlocal
    if (context == null) {
        // todo Using default context.
        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    }

    // Global switch is close, no rule checking will do.
    // 若全局开关是关闭的,直接返回一个无需做规则检测的资源操作对象
    if (!Constants.ON) {
        return new CtEntry(resourceWrapper, null, context);
    }

    // todo 查找SlotChain
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

    /*
     * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
     * so no rule checking will be done.
     */
    // 若没有知道chain,则意味着chain数量超出了阈值
    if (chain == null) {
        return new CtEntry(resourceWrapper, null, context);
    }

    // 创建一个资源操作对象
    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        // todo 对资源进行操作
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        // This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}

2.1 默认Context创建

当前线程没有绑定Context,则创建一个context并将其放入到Threadlocal。核心方法为 InternalContextUtil.internalEnter

public static Context enter(String name, String origin) {
    if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
        throw new ContextNameDefineException(
            "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
    }
    return trueEnter(name, origin);
}

protected static Context trueEnter(String name, String origin) {
    // 尝试从ThreadLocal中获取context
    Context context = contextHolder.get();
    // 若Threadlocal中没有,则尝试从缓存map中获取
    if (context == null) {
        // 缓存map的key为context名称,value为EntranceNode
        Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
        // DCL 双重检测锁,防止并发创建对象
        DefaultNode node = localCacheNameMap.get(name);
        if (node == null) {
            // 若缓存map的size 大于 context数量的最大阈值,则直接返回NULL_CONTEXT
            if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                setNullContext();
                return NULL_CONTEXT;
            } else {
                LOCK.lock();
                try {

                    node = contextNameNodeMap.get(name);
                    if (node == null) {
                        if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                            setNullContext();
                            return NULL_CONTEXT;
                        } else {
                            // 创建一个EntranceNode
                            node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                            // Add entrance node.
                            // 将新建的node添加到Root
                            Constants.ROOT.addChild(node);

                            // 将新建的node写入到缓存map
                            // 为了防止"迭代稳定性问题"-iterate stable 对于共享集合的写操作
                            Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                            newMap.putAll(contextNameNodeMap);
                            newMap.put(name, node);
                            contextNameNodeMap = newMap;
                        }
                    }
                } finally {
                    LOCK.unlock();
                }
            }
        }
        // 将context的name与entranceNode 封装成context
        context = new Context(node, name);
        // 初始化context的来源
        context.setOrigin(origin);
        // 将context写入到ThreadLocal
        contextHolder.set(context);
    }

    return context;
}

注意:因为 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();是 HashMap结构,所以存在并发安全问题,采用 代码中方式进行添加操作。

2.2 查找并创建SlotChain

构建调用链lookProcessChain(resourceWrapper)

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    // 缓存map的key为资源 value为其相关的SlotChain
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    // DCL
    // 若缓存中没有相关的SlotChain 则创建一个并放入到缓存中
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                // 缓存map的size 大于 chain数量的最大阈值,则直接返回null,不在创建新的chain
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }

                // todo 创建新的chain
                chain = SlotChainProvider.newSlotChain();
                // 防止 迭代稳定性问题
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

我们直接看核心方法SlotChainProvider.newSlotChain();

public static ProcessorSlotChain newSlotChain() {
        // 若builder不为null,则直接使用builder构建一个chain
        // 否则先创建一个builder
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }

        // Resolve the slot chain builder SPI.
        // 通过SPI方式创建builder
        slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

        // 若通过SPI未能创建builder,则创建一个默认的DefaultSlotChainBuilder
        if (slotChainBuilder == null) {
            // Should not go through here.
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
                slotChainBuilder.getClass().getCanonicalName());
        }
        // todo 构建一个chain
        return slotChainBuilder.build();
    }

    private SlotChainProvider() {}
}

2.2.1 创建slotChainBuilder

// 通过SPI方式创建builder
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

通过SPI方法创建slotChainBuilder,去项目中META-INF.service中获取:

2.2.2 slotChainBuilder.build()

@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        // 通过SPI方式构建Slot
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}

通过SPI机制,去项目中META-INF.service中获取,在sentinel-core项目中:

还有一个ParamFlowSlot,在sentinel-extension/sentinel-parameter-flow-control下:

我们点击 NodeSelectorSlot, 类上面是有 优先级order,数字越小,优先级越高。

@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

优先级常量为:

public static final int ORDER_NODE_SELECTOR_SLOT = -10000;
public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000;
public static final int ORDER_LOG_SLOT = -8000;
public static final int ORDER_STATISTIC_SLOT = -7000;
public static final int ORDER_AUTHORITY_SLOT = -6000;
public static final int ORDER_SYSTEM_SLOT = -5000;
public static final int ORDER_FLOW_SLOT = -2000;
public static final int ORDER_DEGRADE_SLOT = -1000;

我们看代码中的变量sortedSlotList,已经按照优先级排序好了:

我们看一下构建的ProcessorSlotChain,类似一个单链表结构,如下:

我们看一下相关的类结构:DefaultProcessorSlotChain:

// 这是一个单向链表,默认包含一个接节点,且有两个指针first 和end同时指向这个节点
public class DefaultProcessorSlotChain extends ProcessorSlotChain {

    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {

        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }

        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }

    };
    AbstractLinkedProcessorSlot<?> end = first;

    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }

    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }
}

AbstractLinkedProcessorSlot:

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {

    // 声明一个同类型的变量,则可以指向下一个Slot节点
    private AbstractLinkedProcessorSlot<?> next = null;

    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

    @SuppressWarnings("unchecked")
    void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
        throws Throwable {
        T t = (T)o;
        entry(context, resourceWrapper, t, count, prioritized, args);
    }

    @Override
    public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        if (next != null) {
            next.exit(context, resourceWrapper, count, args);
        }
    }

    public AbstractLinkedProcessorSlot<?> getNext() {
        return next;
    }

    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        this.next = next;
    }

}

构建完成后的SlotChain和工作原理图一样:

上一篇下一篇

猜你喜欢

热点阅读