redis + lua 限流 算法 滑动窗口 漏桶 令牌桶

2023-10-30  本文已影响0人  川流不息attitude

redis + lua 限流 算法 滑动窗口 漏桶 令牌桶

@Getter
@AllArgsConstructor
public enum RateLimitEnum {

    /**
     * 滑动窗口
     */
    SLIDING_WINDOW("sliding_window_request_rate_limiter", "sliding_window_request_rate_limiter.lua"),

    /**
     * 漏桶算法
     */
    LEAKY_BUCKET("request_leaky_rate_limiter", "request_leaky_rate_limiter.lua"),

    /**
     * 计数器
     */
    CONCURRENT("concurrent_request_rate_limiter", "concurrent_request_rate_limiter.lua"),

    /**
     * 令牌桶
     */
    TOKEN_BUCKET("request_rate_limiter", "request_rate_limiter.lua");

    private final String keyName;

    private final String scriptName;


}

sliding_window_request_rate_limiter.lua

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])

local window_size = tonumber(capacity / rate)
local window_time = 1

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "window_size " .. window_size)

local last_requested = 0
local exists_key = redis.call('exists', tokens_key)
if (exists_key == 1) then
    last_requested = redis.call('zcard', tokens_key)
end
--redis.log(redis.LOG_WARNING, "last_requested " .. last_requested)

local remain_request = capacity - last_requested
local allowed_num = 0
if (last_requested < capacity) then
    allowed_num = 1
    redis.call('zadd', tokens_key, now, timestamp_key)
end

--redis.log(redis.LOG_WARNING, "remain_request " .. remain_request)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)

redis.call('zremrangebyscore', tokens_key, 0, now - window_size / window_time)
redis.call('expire', tokens_key, window_size)

return { allowed_num, remain_request }

request_rate_limiter.lua

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
  new_tokens = filled_tokens - requested
  allowed_num = 1
end

redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

return { allowed_num, new_tokens }

request_leaky_rate_limiter.lua

-- current key
local leaky_bucket_key = KEYS[1]
-- last update key
local last_bucket_key = KEYS[2]
-- capacity
local capacity = tonumber(ARGV[2])
-- the rate of leak water
local rate = tonumber(ARGV[1])
-- request count
local requested = tonumber(ARGV[4])
-- current timestamp
local now = tonumber(ARGV[3])
-- the key life time
local key_lifetime = math.ceil((capacity / rate) + 1)


-- the yield of water in the bucket default 0
local key_bucket_count = tonumber(redis.call("GET", leaky_bucket_key)) or 0

-- the last update time default now
local last_time = tonumber(redis.call("GET", last_bucket_key)) or now

-- the time difference
local millis_since_last_leak = now - last_time

-- the yield of water had lasted
local leaks = millis_since_last_leak * rate

if leaks > 0 then
    -- clean up the bucket
    if leaks >= key_bucket_count then
        key_bucket_count = 0
    else
        -- compute the yield of water in the bucket
        key_bucket_count = key_bucket_count - leaks
    end
    last_time = now
end

-- is allowed pass default not allow
local is_allow = 0

local new_bucket_count = key_bucket_count + requested
-- allow
if new_bucket_count <= capacity then
    is_allow = 1
else
    -- not allow
    return {is_allow, new_bucket_count}
end

-- update the key bucket water yield
redis.call("SETEX", leaky_bucket_key, key_lifetime, new_bucket_count)

-- update last update time
redis.call("SETEX", last_bucket_key, key_lifetime, now)

-- return
return {is_allow, new_bucket_count}

/**
 * @Author liyichuan
 * @Date 2023/10/31 9:50
 */
@Data
public class RateLimiterHandle {

    /**
     * 补给率
     */
    private double replenishRate;

    /**
     * 容量大小.
     */
    private double burstCapacity;

    /**
     * 请求数量
     */
    private double requestCount = 1.0;


    /**
     * 唯一key
     */
    private String keyResolverName;

    /**
     * 单位时间 默认秒
     */
    private Long second;

    /**
     *
     * 默认 令牌桶
     * @return the rate limiter handle
     */
    public static RateLimiterHandle newDefaultInstance() {
        RateLimiterHandle rateLimiterHandle = new RateLimiterHandle();
        rateLimiterHandle.setReplenishRate(1.0);
        rateLimiterHandle.setBurstCapacity(100.0);
        rateLimiterHandle.setRequestCount(1.0);
        rateLimiterHandle.setSecond(Instant.now().getEpochSecond());
        return rateLimiterHandle;
    }


}

调用

   DefaultRedisScript redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(RateLimitEnum.TOKEN_BUCKET.getScriptName())));
        redisScript.setResultType(List.class);
        List<String> keys = Lists.newArrayList("1.tokens","1.timestamp");
        RateLimiterHandle limiterHandle = new RateLimiterHandle();
        limiterHandle.setReplenishRate(1);
        limiterHandle.setBurstCapacity(5);
        limiterHandle.setRequestCount(1);
        limiterHandle.setKeyResolverName(RateLimitEnum.TOKEN_BUCKET.getKeyName());
        limiterHandle.setSecond(Long.valueOf(LocalDateTime.now().getMinute()));
        // 令牌桶
        MultiThreadConcurrentTestUtils.multiThreadTest(2,()->{
            List<Long> execute = (List<Long>) stringRedisTemplate.execute(redisScript, keys,
                    String.valueOf(limiterHandle.getReplenishRate()),
                    String.valueOf(limiterHandle.getBurstCapacity()),
                    String.valueOf(limiterHandle.getSecond()),
                    String.valueOf(limiterHandle.getRequestCount()));
            log.info("{}",execute);
        });

参考 脚本 shenyu网关 [github] (https://github.com/lyc88/shenyu/blob/master/shenyu-plugin/shenyu-plugin-fault-tolerance/shenyu-plugin-ratelimiter/src/main/resources/META-INF/scripts/concurrent_request_rate_limiter.lua)

[序列化 数字问题] (https://blog.51cto.com/u_16099164/6504657)

上一篇下一篇

猜你喜欢

热点阅读