devops

(一)redis限流

2019-06-24  本文已影响0人  无聊之园

来自于《Redis深度历险:核心原理和应用实践》,进行分析。

业务场景:一个id,限制每秒只能访问多少次。

下面的代码使用了redission工具类。

不合理的方式:
1、以下这种方式,没有考虑并发问题,一瞬间所有的请求,同时get,然后还没来得及increment,然后就判断错误了。
2、改善方式,先increment,再get。(不过这样的话,get会得到别人increment后的值,有些业务不合适)。或者,使用下面那样的自旋的方式。

@Override
   public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
       String ip = request.getRemoteHost();
       String key = IP_PREFIX + ip;
       Object value = redisService.get(BLACK_PREFIX + ip);
       if(value != null){
           // 在黑名单则不让访问
           return false;
       }
       Object o = redisService.get(key);
       if(o == null){
           redisService.set(key, 1, 2000L);
       } else if(o.equals(50)){
           // 2秒超过50个请求就屏蔽掉,放入黑名单,1小时候之后不能访问
           redisService.set(BLACK_PREFIX + ip, 1, 3600L);
       }else{
           redisService.increment(key, 1);
       }
       return true;
   }

redis自增,必须要使用StringRedisTemplate,而且自定义好key策略。

@Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        //定义key序列化方式
        //RedisSerializer<String> redisSerializer = new StringRedisSerializer();//Long类型会出现异常信息;需要我们上面的自定义key生成策略,一般没必要
        //定义value的序列化方式
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        // template.setKeySerializer(redisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

1、自旋的方式。

这不是书中的方式。

缺点:无法精确的限制,每秒10次,最大的可能会到18次。
优点:内存利用少。

 RedissonClient redisson = Redisson.create();
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");

        String id = "123";
        RBucket<Boolean> bucket = redisson.getBucket("blackId:" + id);
        // 是否是黑名单得
        if(bucket.get() == true){
            // 或者返回最近的一个请求的缓存
            return;
        }
        RAtomicLong idAccessNum = redisson.getAtomicLong(id);
        while (true){
            long num = idAccessNum.get();
            // 没有初始化,则设置过期时间
            // 这种方式,并非准确10秒10次就到了阈值,可能前10秒已经有9次,然后后10秒又请求了9次,
            // 最长可能会有18次。但是我们不需要准确的10次,所以无影响。
            if(num == 0){
                idAccessNum.expire(10, TimeUnit.SECONDS);
            }
            // 超过了10次,则进入黑名单
            if(num > 10){
                // 加入黑名单,30秒之后不能再访问
                bucket.set(true, 30, TimeUnit.SECONDS);
                // 或者返回最近的一个请求的缓存
                return;
            }
            // 这里自旋,只针对同一个用户id,因为理论上同一个用户的请求是线性的,所以这里自旋
            // 不会对性能造成很大的影响
            boolean updateSuccess = idAccessNum.compareAndSet(num, num + 1);
            // 是否更新成功
            if(updateSuccess){
                break;
            }
        }

二、使用zset

这是书中的方式。
优点:能够精确的统计次数。
缺点:比较浪费内存。每请求一次,就ad一个数。

public class SimpleRateLimiter {
    private Jedis jedis;

    public SimpleRateLimiter(Jedis jedis) {
        this.jedis = jedis;
    }

    public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
        String key = String.format("hist:%s:%s", userId, actionKey);
        long nowTs = System.currentTimeMillis();
        Pipeline pipe = jedis.pipelined();
        pipe.multi();
        // value 没有实际的意义,保证唯一就可以
        pipe.zadd(key, nowTs, "" + nowTs);
        pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
        Response<Long> count = pipe.zcard(key);
        pipe.expire(key, period + 1);
        pipe.exec();
        pipe.close();
        return count.get() <= maxCount;
    }

    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
        for (int i = 0; i < 20; i++) {
            System.out.println(limiter.isActionAllowed("laoqian", "reply", 60, 5));
        }
    }
}

如果使用redission。
则代码大概这么写

public static void main(String[] args) {
        RedissonClient redisson = Redisson.create();

        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");

        String id = "123";
        RBucket<Boolean> bucket = redisson.getBucket("blackId:" + id);
        // 是否是黑名单得
        if(bucket.get() == true){
            // 或者返回最近的一个请求的缓存
            return;
        }

        long nanoTime = System.nanoTime();
        RScoredSortedSet<Object> zset = redisson.getScoredSortedSet(id);
        zset.expire(10, TimeUnit.SECONDS);
        zset.add(nanoTime, nanoTime);
        zset.removeRangeByScore(0, true,
                nanoTime - 10 * 1000 * 1000 * 1000, true);
        int size = zset.size();
        // 超过了10次,则进入黑名单
        if(size > 10){
            // 加入黑名单,30秒之后不能再访问
            bucket.set(true, 30, TimeUnit.SECONDS);
            // 或者返回最近的一个请求的缓存
            return;
        }
        // 放行
    }

如果没有redis,只是单机,用java可以大概这么写:

public class TestLimitFlow {

   private Lock lock = new ReentrantLock();

   private Map<String, Entity> map = new ConcurrentHashMap<>();

   public TestLimitFlow() {
       new Thread(() -> {
           while (true) {
               System.out.println(map.size());
               map.forEach((key, value) -> {
                   long now = System.nanoTime();
                   if (value.expireTime < now) {
                       map.remove(key);
                   }
               });
               try {
                   TimeUnit.SECONDS.sleep(1);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }

       }).start();
   }


   class Entity<T> {
       public long expireTime;
       T o;
   }

   public void interceptor(String ip) throws InterruptedException {
       lock.lock();
       Entity e = map.get(ip);
       long nanoTime = System.nanoTime();
       if (e == null) {
           TreeSet<Long> treeSet = new TreeSet<>();
           e = new Entity<TreeSet<Long>>();
           e.expireTime = nanoTime + 10L * 1000 * 1000 * 1000;
           e.o = treeSet;
           map.put(ip, e);
       }
       lock.unlock();
       synchronized (e) {
           e.expireTime = nanoTime + 10L * 1000 * 1000 * 1000;
           TreeSet<Long> zset = (TreeSet<Long>) e.o;
           zset.add(nanoTime);
           List<Long> deleteKey = Lists.newArrayList();
           for(Long item : zset){
               if(item < nanoTime - 10L * 1000 * 1000 * 1000){
                   System.out.println("true");
//                    zset.remove(item);
                   deleteKey.add(item);
               }else {
                   break;
               }
           }
           deleteKey.forEach(item -> {
               zset.remove(item);
           });
           System.out.println(zset.size());
           if (zset.size() > 10) {
               // 加入黑名单
               System.out.println("黑名单:" + ip + ":" + zset.size());
           }
       }

   }

   public static void main(String[] args) throws InterruptedException {
       TestLimitFlow testXL = new TestLimitFlow();
       for(int j = 0; j < 10;j++){
           int finalJ = j;
           new Thread(){
               @Override
               public void run() {
                   for (int i = 0; i < 12; i++) {
//            TimeUnit.SECONDS.sleep(1);
                       try {
                           testXL.interceptor("localhost" + finalJ);
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
                   }
               }
           }.start();
       }


   }
}
上一篇下一篇

猜你喜欢

热点阅读