Sentinel

Sentinel流量控制

2021-06-20  本文已影响0人  晴天哥_王志

系列

开篇

流控规则介绍

流控规则配置

    private static void initFlowRule(int interfaceFlowLimit, boolean method) {
        FlowRule flowRule = new FlowRule(INTERFACE_RES_KEY)
                .setCount(interfaceFlowLimit)
                .setGrade(RuleConstant.FLOW_GRADE_QPS);
        List<FlowRule> list = new ArrayList<>();
        if (method) {
            FlowRule flowRule1 = new FlowRule(RES_KEY)
                    .setCount(5)
                    .setGrade(RuleConstant.FLOW_GRADE_QPS);
            list.add(flowRule1);
        }
        list.add(flowRule);
        FlowRuleManager.loadRules(list);
    }

流控规则定义

public interface Rule {
    String getResource();
}

public abstract class AbstractRule implements Rule {
    // 资源名
    private String resource;
    // 流控对应的调用来源
    private String limitApp;
}

public class FlowRule extends AbstractRule {

    // 流控类型:0=线程,1=QPS FLOW_GRADE_THREAD = 0 FLOW_GRADE_QPS = 1;
    private int grade = RuleConstant.FLOW_GRADE_QPS;
    // 流控阈值,配置的是qps类型则代表qps的值;配置的是线程数类型则代表线程数
    private double count;
     // 流控限流策略
    private int strategy = RuleConstant.STRATEGY_DIRECT;
    // 关联流控的资源
    private String refResource;
    // 流控效果控制 0. default(reject directly), 1. warm up,  2. rate limiter, 3. warm up + rate limiter
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
    // 对应流控效果为Warm Up情况下,出现的预热时长
    private int warmUpPeriodSec = 10;
    // 对应流控效果为排队等待情况下,出现的超时时间
    private int maxQueueingTimeMs = 500;
    // 对应新增流控规则页面的是否集群
    private boolean clusterMode;
    // 集群流控的相关配置
    private ClusterFlowConfig clusterConfig;
    // 流量整形的实现,不同流控效果有不同算法
    private TrafficShapingController controller;
}

一条限流规则主要由下面几个因素组成,我们可以组合这些元素来实现不同的限流效果:

流控流程

public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    private final FlowRuleChecker checker;

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        // 校验是否限流
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }
}


public class FlowRuleChecker {

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {

        // 获取匹配的规则
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                // 检查规则能否通过
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

    public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, 
                               int acquireCount, boolean prioritized) {
        String limitApp = rule.getLimitApp();
        // 集群模式下的规则检测
        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }
        // 单机模式下的规则检测
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }

    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        // 选择流量统计的节点进行限流计算
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        // rule.getRater返回TrafficShapingController对象,
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }

流控效果策略

public class DefaultController implements TrafficShapingController {

    private static final int DEFAULT_AVG_USED_TOKENS = 0;
    private double count;
    private int grade;

    public DefaultController(double count, int grade) {
        this.count = count;
        this.grade = grade;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // 获取当前已使用的token
        int curCount = avgUsedTokens(node);
        // 当前已使用token + 获取的token 大于token数量的场景
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

    private int avgUsedTokens(Node node) {
        if (node == null) {
            return DEFAULT_AVG_USED_TOKENS;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
    }
}

流控策略

com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController
com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController
com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController

public final class FlowRuleUtil {
    private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            switch (rule.getControlBehavior()) {
                // 预热/冷启动
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            ColdFactorProperty.coldFactor);
                // 匀速排队
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                // 冷启动+匀速排队
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);

                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        // 直接拒绝
        return new DefaultController(rule.getCount(), rule.getGrade());
    }
}

流控节点选择

public class FlowRuleChecker {

    static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
        // The limit app should not be empty.
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();

        if (limitApp.equals(origin) && filterOrigin(origin)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Matches limit origin, return origin statistic node.
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
            // 实际访问的分支
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Return the cluster node.
                return node.getClusterNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        }

        return null;
    }
}
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        // 负责创建DefaultNode,职责链以资源维度,slot以职责链维度
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }

        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    // 以资源作为key保存的全局的集群节点
    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
    private static final Object lock = new Object();
    private volatile ClusterNode clusterNode = null;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // Create the cluster node.
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
        // 将DefaultNode设置进集群节点
        node.setClusterNode(clusterNode);

        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}
上一篇 下一篇

猜你喜欢

热点阅读