[ Redis]基于Redis的zset结构实现限流
2020-12-22 本文已影响0人
AbstractCulture
限流的场景
平时项目开发中,限流的场景较为多见,这里举几个例子:
- IO较大的请求,如:文件上传、文件下载。
- SSO认证中心获取授权token,一般会分配有效期,如果对外开放,不适合多次请求。
- 调用第三方接口,会有频次限制。如:阿里云、腾讯云接口等
- 门户app的发帖、回复、点赞等行为,需要进行限流操作
限流策略
我们可以记录当前请求的次数,约定在指定的时间(period
)内,最多允许发生的请求次数(maxRequestCount
)。
此时,我们对外暴露的接口约定可以按以下的方式:
package com.xjm.spring.data.redis.core.limit.support;
/**
* @author jaymin<br>
* 限流器<br>
* 2020/12/21 23:30
*/
public interface RateLimiter {
/**
* 本次请求是否在限流次数内
* @param requestEvent 请求事件,作为Redis存储的key值
* @param period 时间窗,即需要在多少时间范围内限制该行为
* @param maxRequestCount 最大请求次数
* @return
*/
boolean isAllowed(String requestEvent,int period,int maxRequestCount);
}
如何限流
使用Redis的zset
结构可以帮助我们去实现一个简单的限流器。
将请求事件作为key
,当前的时间戳作为score
,同时填充一个唯一值(可以用UUID,但是会耗费多一点性能,这里使用timestamp)作为value
。
可以看到,每次请求进来,都会往zset中增加一个记录。针对不同的事件,采用不同的key值。 然后使用redis的zremrangebyscore key minScore maxScore
指令来对时间窗内的行为进行裁剪。然后通过zcard key
来统计当前时间窗内发生的事件数量进而做出判断即可。
使用pipeline来加快指令执行时间
由于一次限流用到的指令较多,如果你熟悉lua脚本,那么可以针对这个进行lua脚本的编写,这里使用的是redis的管道进行指令加速。
redisClient的技术选型
对于连接Redis,可以使用Jedis,也可以使用Spring的RedisTemplate
,这里使用的是RedisTemplate。
Code
- yml配置
spring:
redis:
lettuce:
pool:
max-wait: -1
min-idle: 0
max-idle: 200
max-active: 100
shutdown-timeout: 100
host: 192.168.xx.xxx
port: 6379
password: xxx
database: 0
timeout: 3000
- RedisTemplateConfig
package com.xjm.spring.data.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.time.Duration;
/**
* com.xjm.redis.template.config
*
* @author xiejiemin
* @create 2020/12/15
*/
@Configuration
public class RedisTemplateConfig {
@Value("${spring.redis.database}")
private int database;
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.timeout}")
private long timeout;
@Value("${spring.redis.lettuce.shutdown-timeout}")
private long shutDownTimeout;
@Value("${spring.redis.lettuce.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.lettuce.pool.min-idle}")
private int minIdle;
@Value("${spring.redis.lettuce.pool.max-active}")
private int maxActive;
@Value("${spring.redis.lettuce.pool.max-wait}")
private long maxWait;
@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(maxIdle);
genericObjectPoolConfig.setMinIdle(minIdle);
genericObjectPoolConfig.setMaxTotal(maxActive);
genericObjectPoolConfig.setMaxWaitMillis(maxWait);
genericObjectPoolConfig.setTimeBetweenEvictionRunsMillis(100);
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
redisStandaloneConfiguration.setDatabase(database);
redisStandaloneConfiguration.setHostName(host);
redisStandaloneConfiguration.setPort(port);
redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.commandTimeout(Duration.ofMillis(timeout))
.shutdownTimeout(Duration.ofMillis(shutDownTimeout))
.poolConfig(genericObjectPoolConfig)
.build();
LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfig);
factory.setShareNativeConnection(true);
return factory;
}
/**
* 设置 redisTemplate 的序列化设置
* @param redisConnectionFactory
* @return
*/
@Bean("myRedisTemplateConfig")
public RedisTemplate<Object, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
// 1.创建 redisTemplate 模版
RedisTemplate<Object, Object> template = new RedisTemplate<>();
// 2.关联 redisConnectionFactory
template.setConnectionFactory(lettuceConnectionFactory);
// 3.创建 序列化类
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
// 4.设置可见度
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 5.启动默认的类型
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
// 6.序列化类,对象映射设置
jackson2JsonRedisSerializer.setObjectMapper(om);
// 7.设置 value 的转化格式和 key 的转化格式
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
template.setEnableTransactionSupport(false);
return template;
}
}
- RateLimiterLoader
package com.xjm.spring.data.redis.core.limit.support;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.Instant;
import java.util.List;
/**
* @author jaymin
* 2020/12/20 20:28
*/
@Component
@Slf4j
public class RateLimiterLoader implements RateLimiter {
@Resource(name = "myRedisTemplateConfig")
private RedisTemplate redisTemplate;
/**
* 本次请求是否在限流次数内
*
* @param requestEvent 请求事件,作为Redis存储的key值
* @param period 时间窗,即需要在多少时间范围内限制该行为
* @param maxRequestCount 最大请求次数
* @return
*/
@Override
public boolean isAllowed(String requestEvent, int period, int maxRequestCount) {
if (StringUtils.isBlank(requestEvent)) {
throw new RuntimeException("Expect the input parameter to exist, the actual value is empty");
}
// 1. 获取当前的时间戳
long now = Instant.now().toEpochMilli();
log.info("current timestamp :{}", now);
RedisSerializer stringSerializer = redisTemplate.getStringSerializer();
byte[] redisKey = stringSerializer.serialize(requestEvent);
// 2. 建立管道
List<Object> list = redisTemplate.executePipelined((RedisCallback) redisConnection -> {
byte[] value = stringSerializer.serialize(String.valueOf(now));
// 3. 将当前的操作先存储下来
redisConnection.zAdd(redisKey, now, value);
double maxScope = now - period * 1000;
log.info("max scope:{}", maxScope);
// 4. 移除时间窗之外的数据
redisConnection.zRemRangeByScore(redisKey, 0, maxScope);
// 5. 统计剩下的key
redisConnection.zCard(redisKey);
// 6. 将当前key设置过期时间,过期时间为时间窗
redisConnection.expire(redisKey, period + 1);
return null;
});
Long currentRequestCount = (Long) list.get(2);
// 8. 比较时间窗内的操作数
log.info("current request count:{}", currentRequestCount);
return currentRequestCount <= maxRequestCount;
}
}
- TestController
package com.xjm.modules;
import com.xjm.spring.data.redis.core.limit.support.RateLimiterLoader;
import com.xjm.thread.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author jaymin
* 2020/12/21 1:15
*/
@RestController
@Slf4j
@RequestMapping(value = "/redis")
public class TestController {
@Autowired
private RateLimiterLoader rateLimiter;
private static volatile int allowedCount = 0;
@GetMapping("/test")
public String testRateLimiter(){
String key = "jaymin:limit:test";
for (int i = 0; i < 200; i++) {
if (rateLimiter.isAllowed(key,60,10)){
allowedCount++;
}
}
return String.format("The allowed count is %s", allowedCount);
}
}
- Result
总结
- 整个限流方案中,scope是最关键的,使用时间戳进行窗口滑动,同时注意保持value值唯一
- 高并发场景下,存在内存消耗的问题。因为需要记录每次的行为,所以不建议用在大并发(60S内请求次数100W)的场景下。
- pipeline是非原子的,如果有严格要求,可以采用lua脚本。