92 - 实战之限流框架(实现篇)

2021-10-31  本文已影响0人  舍是境界

前面我们介绍了如何通过合理的设计,来实现功能性需求的同时,满足易用、易扩展、灵活、低延迟、高容错等非功能性需求。在设计的过程中,我们也借鉴了之前讲过的一些开源项目的设计思想。比如,我们借鉴了 Spring 的低侵入松耦合、约定优于配置等设计思想,还借鉴了 MyBatis 通过 MyBatis-Spring 类库将框架的易用性做到极致等设计思路。

现在我们来讲,针对限流框架的开发,如何做高质量的代码实现。说的具体点就是,如何利用之前讲过的设计思想、原则、模式、编码规范、重构技巧等,写出易读、易扩展、易维护、灵活、简洁、可复用、易测试的代码。

V1 版本功能需求

最小原型代码

com.xzg.ratelimiter
  --RateLimiter
com.xzg.ratelimiter.rule
  --ApiLimit
  --RuleConfig
  --RateLimitRule
com.xzg.ratelimiter.alg
  --RateLimitAlg
public class RateLimiter {
  private static final Logger log = LoggerFactory.getLogger(RateLimiter.class);
  // 为每个api在内存中存储限流计数器
  private ConcurrentHashMap<String, RateLimitAlg> counters = new ConcurrentHashMap<>();
  private RateLimitRule rule;
  public RateLimiter() {
    // 将限流规则配置文件ratelimiter-rule.yaml中的内容读取到RuleConfig中
    InputStream in = null;
    RuleConfig ruleConfig = null;
    try {
      in = this.getClass().getResourceAsStream("/ratelimiter-rule.yaml");
      if (in != null) {
        Yaml yaml = new Yaml();
        ruleConfig = yaml.loadAs(in, RuleConfig.class);
      }
    } finally {
      if (in != null) {
        try {
          in.close();
        } catch (IOException e) {
          log.error("close file error:{}", e);
        }
      }
    }
    // 将限流规则构建成支持快速查找的数据结构RateLimitRule
    this.rule = new RateLimitRule(ruleConfig);
  }
  public boolean limit(String appId, String url) throws InternalErrorException {
    ApiLimit apiLimit = rule.getLimit(appId, url);
    if (apiLimit == null) {
      return true;
    }
    // 获取api对应在内存中的限流计数器(rateLimitCounter)
    String counterKey = appId + ":" + apiLimit.getApi();
    RateLimitAlg rateLimitCounter = counters.get(counterKey);
    if (rateLimitCounter == null) {
      RateLimitAlg newRateLimitCounter = new RateLimitAlg(apiLimit.getLimit());
      rateLimitCounter = counters.putIfAbsent(counterKey, newRateLimitCounter);
      if (rateLimitCounter == null) {
        rateLimitCounter = newRateLimitCounter;
      }
    }
    // 判断是否限流
    return rateLimitCounter.tryAcquire();
  }
}
public class RuleConfig {
  private List<AppRuleConfig> configs;
  public List<AppRuleConfig> getConfigs() {
    return configs;
  }
  public void setConfigs(List<AppRuleConfig> configs) {
    this.configs = configs;
  }
  public static class AppRuleConfig {
    private String appId;
    private List<ApiLimit> limits;
    public AppRuleConfig() {}
    public AppRuleConfig(String appId, List<ApiLimit> limits) {
      this.appId = appId;
      this.limits = limits;
    }
    //...省略getter、setter方法...
  }
}
public class ApiLimit {
  private static final int DEFAULT_TIME_UNIT = 1; // 1 second
  private String api;
  private int limit;
  private int unit = DEFAULT_TIME_UNIT;
  public ApiLimit() {}
  public ApiLimit(String api, int limit) {
    this(api, limit, DEFAULT_TIME_UNIT);
  }
  public ApiLimit(String api, int limit, int unit) {
    this.api = api;
    this.limit = limit;
    this.unit = unit;
  }
  // ...省略getter、setter方法...
}
configs:          <!--对应RuleConfig-->
- appId: app-1    <!--对应AppRuleConfig-->
  limits:
  - api: /v1/user <!--对应ApiLimit-->
    limit: 100
    unit:60
  - api: /v1/order
    limit: 50
- appId: app-2
  limits:
  - api: /v1/user
    limit: 50
  - api: /v1/order
    limit: 50
前缀匹配示意图
ublic class RateLimitRule {
  public RateLimitRule(RuleConfig ruleConfig) {
    //...
  }
  public ApiLimit getLimit(String appId, String api) {
    //...
  }
}
public class RateLimitAlg {
  /* timeout for {@code Lock.tryLock() }. */
  private static final long TRY_LOCK_TIMEOUT = 200L;  // 200ms.
  private Stopwatch stopwatch;
  private AtomicInteger currentCount = new AtomicInteger(0);
  private final int limit;
  private Lock lock = new ReentrantLock();
  public RateLimitAlg(int limit) {
    this(limit, Stopwatch.createStarted());
  }
  @VisibleForTesting
  protected RateLimitAlg(int limit, Stopwatch stopwatch) {
    this.limit = limit;
    this.stopwatch = stopwatch;
  }
  public boolean tryAcquire() throws InternalErrorException {
    int updatedCount = currentCount.incrementAndGet();
    if (updatedCount <= limit) {
      return true;
    }
    try {
      if (lock.tryLock(TRY_LOCK_TIMEOUT, TimeUnit.MILLISECONDS)) {
        try {
          if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > TimeUnit.SECONDS.toMillis(1)) {
            currentCount.set(0);
            stopwatch.reset();
          }
          updatedCount = currentCount.incrementAndGet();
          return updatedCount <= limit;
        } finally {
          lock.unlock();
        }
      } else {
        throw new InternalErrorException("tryAcquire() wait lock too long:" + TRY_LOCK_TIMEOUT + "ms");
      }
    } catch (InterruptedException e) {
      throw new InternalErrorException("tryAcquire() is interrupted by lock-time-out.", e);
    }
  }
}

Review 最小原型代码

  1. 首先,我们来看下代码的可读性。
  1. 其次,我们再来看下代码的扩展性

重构最小原型代码

// 重构前:
com.xzg.ratelimiter
  --RateLimiter
com.xzg.ratelimiter.rule
  --ApiLimit
  --RuleConfig
  --RateLimitRule
com.xzg.ratelimiter.alg
  --RateLimitAlg
  
// 重构后:
com.xzg.ratelimiter
  --RateLimiter(有所修改)
com.xzg.ratelimiter.rule
  --ApiLimit(不变)
  --RuleConfig(不变)
  --RateLimitRule(抽象接口)
  --TrieRateLimitRule(实现类,就是重构前的RateLimitRule)
com.xzg.ratelimiter.rule.parser
  --RuleConfigParser(抽象接口)
  --YamlRuleConfigParser(Yaml格式配置文件解析类)
  --JsonRuleConfigParser(Json格式配置文件解析类)
com.xzg.ratelimiter.rule.datasource
  --RuleConfigSource(抽象接口)
  --FileRuleConfigSource(基于本地文件的配置类)
com.xzg.ratelimiter.alg
  --RateLimitAlg(抽象接口)
  --FixedTimeWinRateLimitAlg(实现类,就是重构前的RateLimitAlg)
public class RateLimiter {
  private static final Logger log = LoggerFactory.getLogger(RateLimiter.class);
  // 为每个api在内存中存储限流计数器
  private ConcurrentHashMap<String, RateLimitAlg> counters = new ConcurrentHashMap<>();
  private RateLimitRule rule;
  public RateLimiter() {
    //改动主要在这里:调用RuleConfigSource类来实现配置加载
    RuleConfigSource configSource = new FileRuleConfigSource();
    RuleConfig ruleConfig = configSource.load();
    this.rule = new TrieRateLimitRule(ruleConfig);
  }
  public boolean limit(String appId, String url) throws InternalErrorException, InvalidUrlException {
    //...代码不变...
  }
}
com.xzg.ratelimiter.rule.parser
  --RuleConfigParser(抽象接口)
  --YamlRuleConfigParser(Yaml格式配置文件解析类)
  --JsonRuleConfigParser(Json格式配置文件解析类)
com.xzg.ratelimiter.rule.datasource
  --RuleConfigSource(抽象接口)
  --FileRuleConfigSource(基于本地文件的配置类)
  
public interface RuleConfigParser {
  RuleConfig parse(String configText);
  RuleConfig parse(InputStream in);
}
public interface RuleConfigSource {
  RuleConfig load();
}
public class FileRuleConfigSource implements RuleConfigSource {
  private static final Logger log = LoggerFactory.getLogger(FileRuleConfigSource.class);
  public static final String API_LIMIT_CONFIG_NAME = "ratelimiter-rule";
  public static final String YAML_EXTENSION = "yaml";
  public static final String YML_EXTENSION = "yml";
  public static final String JSON_EXTENSION = "json";
  private static final String[] SUPPORT_EXTENSIONS =
      new String[] {YAML_EXTENSION, YML_EXTENSION, JSON_EXTENSION};
  private static final Map<String, RuleConfigParser> PARSER_MAP = new HashMap<>();
  static {
    PARSER_MAP.put(YAML_EXTENSION, new YamlRuleConfigParser());
    PARSER_MAP.put(YML_EXTENSION, new YamlRuleConfigParser());
    PARSER_MAP.put(JSON_EXTENSION, new JsonRuleConfigParser());
  }
  @Override
  public RuleConfig load() {
    for (String extension : SUPPORT_EXTENSIONS) {
      InputStream in = null;
      try {
        in = this.getClass().getResourceAsStream("/" + getFileNameByExt(extension));
        if (in != null) {
          RuleConfigParser parser = PARSER_MAP.get(extension);
          return parser.parse(in);
        }
      } finally {
        if (in != null) {
          try {
            in.close();
          } catch (IOException e) {
            log.error("close file error:{}", e);
          }
        }
      }
    }
    return null;
  }
  private String getFileNameByExt(String extension) {
    return API_LIMIT_CONFIG_NAME + "." + extension;
  }
}

小结

上一篇 下一篇

猜你喜欢

热点阅读