Sentinel之Slots插槽源码分析(二)

2018-12-28  本文已影响0人  橘子_好多灰

一、概述

上篇文章介绍过了NodeSelectorSlot和ClusterBuilderSlot两种插槽,接下来我们沿着默认的插槽链继续分析LogSlot和StatisticSlot

二、LogSlot

我们看代码:

     @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        try {
            fireEntry(context, resourceWrapper, obj, count, prioritized, args);
        } catch (BlockException e) {
            EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
                context.getOrigin(), count);
            throw e;
        } catch (Throwable e) {
            RecordLog.info("Entry exception", e);
        }

    }

可以发现,如果在接下来的插槽链中发生BlockException异常的话,LogSlot会记录日志信息。
继续看EagleEyeLogUtil类:

public class EagleEyeLogUtil {

    public static final String FILE_NAME = "sentinel-block.log";

    private static StatLogger statLogger;

    static {
        String path = LogBase.getLogBaseDir() + FILE_NAME;

        statLogger = EagleEye.statLoggerBuilder("sentinel-block-log")
            .intervalSeconds(1)
            .entryDelimiter('|')
            .keyDelimiter(',')
            .valueDelimiter(',')
            .maxEntryCount(6000)
            .configLogFilePath(path)
            .maxFileSizeMB(300)
            .maxBackupIndex(3)
            .buildSingleton();
    }

    public static void log(String resource, String exceptionName, String ruleLimitApp, String origin, int count) {
        statLogger.stat(resource, exceptionName, ruleLimitApp, origin).count(count);
    }
}

可以发现block日志是记录在sentienl-block.log中,这里block日志的格式化及本地文件的写入功能在eagleeye包中,如图:

log

个人觉得这个异常日志记录相对于sentienl的record日志开发有点过于复杂,鉴于不是sentienl的核心功能,我也只大概看了里面的代码。
大致就是在StatEntry中调用count的方法,然后在StatRollingData中调用StatLogController中scheduleWriteTask方法进而创建一个定时任务线程池,任务是StatLogWriteTask,在这个任务中写入日志到本地文件中。

三、StatisticSlot

StatisticSlot是sentienl的指标数据统计插槽,也是sentienl种非常重要的一个模块,sentienl后续的限流,降级,熔断都是根据这一阶段的统计数据进行。

前面文章在介绍sentinel的滑动时间窗口时,已经知道sentienl的指标统计是基于滑动时间窗口的,可以看文章 Sentinel之滑动时间窗口设计

下面主要看在StatisticSlot插槽中,sentinel做了哪些数据的统计的。

public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest();

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest();
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setError(e);

            // Add block count.
            node.increaseBlockQps();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps();
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected error, set error to current entry.
            context.getCurEntry().setError(e);

            // This should not happen.
            node.increaseExceptionQps();
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseExceptionQps();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseExceptionQps();
            }
            throw e;
        }
    }

在方法entry中,这里是先调用fireEntry方法继续资源保护检查。

请求通过

增加node的线程数increaseThreadNum。
在DefaultNode类中increaseThreadNum方法中:

@Override
    public void increaseThreadNum() {
        super.increaseThreadNum();
        this.clusterNode.increaseThreadNum();
    }

然后调用StatisticNode中increaseThreadNum方法,在increaseThreadNum方法中执行curThreadNum.incrementAndGet()。
这里线程数增加是通过定义一个原子变量的,curThreadNum是一个AtomicInteger原子变量,初始值是0 ,执行incrementAndGet方法加1,并返回当前值。

增加node的通过请求数addPassRequest。
在DefaultNode类中addPassRequest方法中:

@Override
    public void addPassRequest() {
        super.addPassRequest();
        this.clusterNode.addPassRequest();
    }

在StatisticNode中的addPassRequest方法:

     @Override
    public void addPassRequest() {
        rollingCounterInSecond.addPass();
        rollingCounterInMinute.addPass();
    }

根据滑动时间窗口统计了两个时间窗口的数据。

源节点originNode
如果originNode存在,则也需要增加originNode的线程数和请求通过数。

ENTRY_NODE
如果资源包装类型是IN的话,则需要ENTRY_NODE的线程数和请求通过数。
ENTRY_NODE是sentinel全局的统计节点,用于后续系统规则检查。

ProcessorSlotEntryCallback
然后再循环处理注册了ProcessorSlotEntryCallback的StatisticSlot。

StatisticSlotCallbackRegistry是一个StatisticSlot回调注册器,目前只有两种回调支持,ProcessorSlotEntryCallback和ProcessorSlotExitCallback。

ProcessorSlotEntryCallback是在资源正常调用时处理,ProcessorSlotExitCallback是在发送BlockException异常或者资源退出时调用。

目前只有热点限流的统计把ParamFlowStatisticSlotCallbackInit注册到ProcessorSlotEntryCallback中,看下面代码。

public class ParamFlowStatisticSlotCallbackInit implements InitFunc {

    @Override
    public void init() {
        StatisticSlotCallbackRegistry.addEntryCallback(ParamFlowStatisticEntryCallback.class.getName(),
            new ParamFlowStatisticEntryCallback());
    }
}

这里系统若是引用了热点参数限流模块,则客户端系统会在启动时把ProcessorSlotEntryCallback注册到StatisticSlotCallbackRegistry中。

ProcessorSlotEntryCallback

public class ParamFlowStatisticEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> {

    @Override
    public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
        throws Exception {
        // The "hot spot" parameter metric is present only if parameter flow rules for the resource exist.
        ParameterMetric parameterMetric = ParamFlowSlot.getParamMetric(resourceWrapper);

        if (parameterMetric != null) {
            parameterMetric.addPass(count, args);
        }
    }

    @Override
    public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param,
                          int count, Object... args) {
        // Here we don't add block count here because checking the type of block exception can affect performance.
        // We add the block count when throwing the ParamFlowException instead.
    }
}

onPass方法就是用来热点参数的指标的,热点参数限流可以专门一个模块来学习,这里后面会再专门讲解。

抛出BlockException

SetError
这里会设置Context的CurEntry的Error属性,error属性可以在资源退出的时候判断使用,若不存在error,会统计响应时间rt等。

增加block线程数
在DefaultNode类中increaseBlockQps方法中:

 @Override
    public void increaseBlockQps() {
        super.increaseBlockQps();
        this.clusterNode.increaseBlockQps();
    }

在StatisticNode中的increaseBlockQps方法:

 @Override
    public void increaseBlockQps() {
        rollingCounterInSecond.addBlock();
        rollingCounterInMinute.addBlock();
    }

根据滑动时间窗口统计了两个时间窗口的数据。

rollingCounterInSecond:时间窗口是1s
rollingCounterInMinute:时间窗口是1分钟

源节点originNode
如果originNode存在,则也需要增加originNode请求Block数。

ENTRY_NODE
如果资源包装类型是IN的话,则需要ENTRY_NODE的Block数。

然后再进行ProcessorSlotEntryCallback的onBlocke方法,这个也是热点参数限流才会有。

抛出Throwable

先设置curEntry的error

增加node的异常数
node.increaseExceptionQps();

DefaultNode中:

@Override
    public void increaseExceptionQps() {
        super.increaseExceptionQps();
        this.clusterNode.increaseExceptionQps();
    }

然后判断originNode和包装类型,增加对应的exception数。

在Exit中


    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        DefaultNode node = (DefaultNode)context.getCurNode();

        if (context.getCurEntry().getError() == null) {
            // Calculate response time (max RT is TIME_DROP_VALVE).
            long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
            if (rt > Constants.TIME_DROP_VALVE) {
                rt = Constants.TIME_DROP_VALVE;
            }

            // Record response time and success count.
            node.rt(rt);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().rt(rt);
            }

            node.decreaseThreadNum();

            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().decreaseThreadNum();
            }

            if (resourceWrapper.getType() == EntryType.IN) {
                Constants.ENTRY_NODE.rt(rt);
                Constants.ENTRY_NODE.decreaseThreadNum();
            }
        } else {
            // Error may happen.
        }

        // Handle exit event with registered exit callback handlers.
        Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
        for (ProcessorSlotExitCallback handler : exitCallbacks) {
            handler.onExit(context, resourceWrapper, count, args);
        }

        fireExit(context, resourceWrapper, count);
    }

这里主要统计这次请求的响应时间rt,

    public final static int TIME_DROP_VALVE = 4900;

如果rt大于TIME_DROP_VALVE,则设置rt为TIME_DROP_VALVE。

四、我的总结

1、介绍LogSlot和StatisticSlot插槽的代码设计,其中LogSlot没有详细深入。
2、LogSlot代码设计较为复杂,个人感觉功能可以重新设计。
3、StatisticSlot的统计是根据滑动时间窗口,但是线程数的统计是设置一个AtomicInteger原子变量。
3、如果StatisticSlotCallbackRegistry中注册ProcessorSlotEntryCallback和ProcessorSlotExitCallback回调器,则在回调器中也会统计数据,目前只有热点参数限流会使用。
4、Sentinel后续的插槽就是根据StatisticSlot统计的数据进行资源的保护的。


以上内容若有不当之处,请指正,谢谢!

上一篇 下一篇

猜你喜欢

热点阅读