(一)redis限流
2019-06-24 本文已影响0人
无聊之园
来自于《Redis深度历险:核心原理和应用实践》,进行分析。
业务场景:一个id,限制每秒只能访问多少次。
下面的代码使用了redission工具类。
不合理的方式:
1、以下这种方式,没有考虑并发问题,一瞬间所有的请求,同时get,然后还没来得及increment,然后就判断错误了。
2、改善方式,先increment,再get。(不过这样的话,get会得到别人increment后的值,有些业务不合适)。或者,使用下面那样的自旋的方式。
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String ip = request.getRemoteHost();
String key = IP_PREFIX + ip;
Object value = redisService.get(BLACK_PREFIX + ip);
if(value != null){
// 在黑名单则不让访问
return false;
}
Object o = redisService.get(key);
if(o == null){
redisService.set(key, 1, 2000L);
} else if(o.equals(50)){
// 2秒超过50个请求就屏蔽掉,放入黑名单,1小时候之后不能访问
redisService.set(BLACK_PREFIX + ip, 1, 3600L);
}else{
redisService.increment(key, 1);
}
return true;
}
redis自增,必须要使用StringRedisTemplate,而且自定义好key策略。
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
//定义key序列化方式
//RedisSerializer<String> redisSerializer = new StringRedisSerializer();//Long类型会出现异常信息;需要我们上面的自定义key生成策略,一般没必要
//定义value的序列化方式
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// template.setKeySerializer(redisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
1、自旋的方式。
这不是书中的方式。
缺点:无法精确的限制,每秒10次,最大的可能会到18次。
优点:内存利用少。
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
String id = "123";
RBucket<Boolean> bucket = redisson.getBucket("blackId:" + id);
// 是否是黑名单得
if(bucket.get() == true){
// 或者返回最近的一个请求的缓存
return;
}
RAtomicLong idAccessNum = redisson.getAtomicLong(id);
while (true){
long num = idAccessNum.get();
// 没有初始化,则设置过期时间
// 这种方式,并非准确10秒10次就到了阈值,可能前10秒已经有9次,然后后10秒又请求了9次,
// 最长可能会有18次。但是我们不需要准确的10次,所以无影响。
if(num == 0){
idAccessNum.expire(10, TimeUnit.SECONDS);
}
// 超过了10次,则进入黑名单
if(num > 10){
// 加入黑名单,30秒之后不能再访问
bucket.set(true, 30, TimeUnit.SECONDS);
// 或者返回最近的一个请求的缓存
return;
}
// 这里自旋,只针对同一个用户id,因为理论上同一个用户的请求是线性的,所以这里自旋
// 不会对性能造成很大的影响
boolean updateSuccess = idAccessNum.compareAndSet(num, num + 1);
// 是否更新成功
if(updateSuccess){
break;
}
}
二、使用zset
这是书中的方式。
优点:能够精确的统计次数。
缺点:比较浪费内存。每请求一次,就ad一个数。
public class SimpleRateLimiter {
private Jedis jedis;
public SimpleRateLimiter(Jedis jedis) {
this.jedis = jedis;
}
public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
String key = String.format("hist:%s:%s", userId, actionKey);
long nowTs = System.currentTimeMillis();
Pipeline pipe = jedis.pipelined();
pipe.multi();
// value 没有实际的意义,保证唯一就可以
pipe.zadd(key, nowTs, "" + nowTs);
pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
Response<Long> count = pipe.zcard(key);
pipe.expire(key, period + 1);
pipe.exec();
pipe.close();
return count.get() <= maxCount;
}
public static void main(String[] args) {
Jedis jedis = new Jedis();
SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
for (int i = 0; i < 20; i++) {
System.out.println(limiter.isActionAllowed("laoqian", "reply", 60, 5));
}
}
}
如果使用redission。
则代码大概这么写
public static void main(String[] args) {
RedissonClient redisson = Redisson.create();
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
String id = "123";
RBucket<Boolean> bucket = redisson.getBucket("blackId:" + id);
// 是否是黑名单得
if(bucket.get() == true){
// 或者返回最近的一个请求的缓存
return;
}
long nanoTime = System.nanoTime();
RScoredSortedSet<Object> zset = redisson.getScoredSortedSet(id);
zset.expire(10, TimeUnit.SECONDS);
zset.add(nanoTime, nanoTime);
zset.removeRangeByScore(0, true,
nanoTime - 10 * 1000 * 1000 * 1000, true);
int size = zset.size();
// 超过了10次,则进入黑名单
if(size > 10){
// 加入黑名单,30秒之后不能再访问
bucket.set(true, 30, TimeUnit.SECONDS);
// 或者返回最近的一个请求的缓存
return;
}
// 放行
}
如果没有redis,只是单机,用java可以大概这么写:
public class TestLimitFlow {
private Lock lock = new ReentrantLock();
private Map<String, Entity> map = new ConcurrentHashMap<>();
public TestLimitFlow() {
new Thread(() -> {
while (true) {
System.out.println(map.size());
map.forEach((key, value) -> {
long now = System.nanoTime();
if (value.expireTime < now) {
map.remove(key);
}
});
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
class Entity<T> {
public long expireTime;
T o;
}
public void interceptor(String ip) throws InterruptedException {
lock.lock();
Entity e = map.get(ip);
long nanoTime = System.nanoTime();
if (e == null) {
TreeSet<Long> treeSet = new TreeSet<>();
e = new Entity<TreeSet<Long>>();
e.expireTime = nanoTime + 10L * 1000 * 1000 * 1000;
e.o = treeSet;
map.put(ip, e);
}
lock.unlock();
synchronized (e) {
e.expireTime = nanoTime + 10L * 1000 * 1000 * 1000;
TreeSet<Long> zset = (TreeSet<Long>) e.o;
zset.add(nanoTime);
List<Long> deleteKey = Lists.newArrayList();
for(Long item : zset){
if(item < nanoTime - 10L * 1000 * 1000 * 1000){
System.out.println("true");
// zset.remove(item);
deleteKey.add(item);
}else {
break;
}
}
deleteKey.forEach(item -> {
zset.remove(item);
});
System.out.println(zset.size());
if (zset.size() > 10) {
// 加入黑名单
System.out.println("黑名单:" + ip + ":" + zset.size());
}
}
}
public static void main(String[] args) throws InterruptedException {
TestLimitFlow testXL = new TestLimitFlow();
for(int j = 0; j < 10;j++){
int finalJ = j;
new Thread(){
@Override
public void run() {
for (int i = 0; i < 12; i++) {
// TimeUnit.SECONDS.sleep(1);
try {
testXL.interceptor("localhost" + finalJ);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}
}