redis

redisson延迟队列丢失消息排查

2021-02-03  本文已影响0人  Java及SpringBoot

背景

<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.12.5</version>
</dependency>
//生产端
RBlockingQueue<Message> destinationQueue = redissonClient.getBlockingQueue("delay_queue_name");
RDelayedQueue<Message> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
delayedQueue.offer(message, n2, TimeUnit.SECONDS);

//消费端 阻塞方式
Message ms = blockingQueue.poll("timeout", "TimeUnit");

使用了两个queue,对delayedQueue的offer操作是直接进入delayedQueue,但是delay是作用在目标队列RBlockingQueue上面

初步排查分析

Redisson延时队列底层实现分析

@Log4j2
@ExtendWith(SpringExtension.class)
@SpringBootTest
@ActiveProfiles(value = "dev")
public class RedisDelayedQueueTest {

    @Autowired
    private ApplicationContext ctx;

    private RedissonClient redissonClient;

    @Autowired
    public void setRedissonTemplate(RedissonTemplate redissonTemplate) {
        this.redissonClient = redissonTemplate.getRedissonClient();
    }

    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    static class Message {
        private String id;

        private String data;
    }

    @Test
    public void testRedisDelayedQueue() {
        assertThat(this.ctx).isNotNull();
        assertThat(this.ctx.containsBean("application")).isTrue();
        int maxRoundCount = 3;
        int roundMessageCount = 1000;
        final CountDownLatch downLatch = new CountDownLatch(maxRoundCount);
        Map<Integer, RBlockingQueue<Message>> rBlockingQueueHashMap = Maps.newHashMap();
        IntStream.range(0, maxRoundCount).forEach(roundId -> {
            RBlockingQueue<Message> destinationQueue = redissonClient.getBlockingQueue("delay_shard_queue_" + roundId);
            rBlockingQueueHashMap.put(roundId, destinationQueue);
        });
        IntStream.range(0, maxRoundCount).forEach(roundId -> {
            RBlockingQueue<Message> blockingQueue = rBlockingQueueHashMap.get(roundId);
            try {
                new Thread(() -> {
                    long maxDiff = 0L;
                    final AtomicInteger messageCount = new AtomicInteger(0);
                    while (messageCount.get() < roundMessageCount) {
                        try {
                            Message ms = blockingQueue.poll(1, TimeUnit.MILLISECONDS);
                            if (ms == null) {
                                continue;
                            }
                            Long actualDelayMs = (System.currentTimeMillis() - Long.parseLong(ms.getId()));
                            Long expectDelayMs = Integer.parseInt(ms.getData()) * 1000L;
                            long diffDelayMs = Math.abs(actualDelayMs - expectDelayMs);
                            maxDiff = Math.max(maxDiff, diffDelayMs);
                            log.info("expectDelayMs: {}, actualDelayMs: {}, diffDelayMs: {}", expectDelayMs, actualDelayMs, diffDelayMs);
                            messageCount.incrementAndGet();
                        } catch (Throwable t) {
                            t.printStackTrace();
                        }
                    }
                    log.info("## round: {}, message count: {}, maxDiff: {}", roundId, messageCount.get(), maxDiff);
                    downLatch.countDown();
                }).start();
            } catch (Throwable t) {
                t.printStackTrace();
            }
            new Thread(() -> IntStream.range(0, roundMessageCount).mapToLong(i -> System.currentTimeMillis()).forEach(currentTime -> {
                int n2 = RandomUtils.nextInt(1, 5);
                Message message = new Message(currentTime + "", n2 + "");
                RDelayedQueue<Message> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
                delayedQueue.offer(message, n2, TimeUnit.SECONDS);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            })).start();
        });
        try {
            downLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        redissonClient.shutdown(3000, 6000, TimeUnit.MILLISECONDS);
    }
}

自己测试可以在本地搞个redis,然后打开redis monitor进行分析底层执行的是哪些命令

redis-cli -c -p 6379 -h 127.0.0.1 --raw
monitor 
命令可以监控redis执行了哪些命令,线上不要整很耗性能
用poll带超时时间的阻塞或者take阻塞等待程序启动,offer方法执行之前,redis服务会执行如下redis命令:

SUBSCRIBE
这里订阅了一个固定的队列 redisson_delay_queue_channel:{delay_shard_queue_0}, 就是为了开启进程里面的延时任务,redisson延时取数据就是利用这个队列实现。
1612328453.261918 [0 127.0.0.1:49559] "SUBSCRIBE" "redisson_delay_queue_channel:{delay_shard_queue_0}"


这是一段lua脚本,核心是调用了zrangebyscore,是将到期的元素从元素队列移到目标队列。
zrangebyscore,对存储超时时间的zset使用timeout参数进行排序,取得分介于0和当前时间戳的元素,取前100条,如果有值表示该元素需要移交到目标队列,然后调用rpush移交到目标队列,再调用lrem从元素队列移除,最后在从zset中删除掉已经处理的这些元素。
处理完过元素转移之后,再取zset的第一个元素的得分返回,如果没有返回nil
KEY[1]:redisson内部的队列名称:redisson_delay_queue
KEY[2]:存储超时时间的zset即为:redisson_delay_queue_timeout
KEY[3]:延迟队列名称即为:delay_shard_queue_0
ARGV[1]:当前时间戳
ARGV[2]:限制取的数量为100
1612328453.280168 [0 127.0.0.1:49551] "EVAL" "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); if #expiredValues > 0 then for i, v in ipairs(expiredValues) do local randomId, value = struct.unpack('dLc0', v);redis.call('rpush', KEYS[1], value);redis.call('lrem', KEYS[3], 1, v);end; redis.call('zrem', KEYS[2], unpack(expiredValues));end; local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); if v[1] ~= nil then return v[2]; end return nil;" "3" "delay_shard_queue_0" "redisson_delay_queue_timeout:{delay_shard_queue_0}" "redisson_delay_queue:{delay_shard_queue_0}" "1612328453270" "100"

zrangebyscore >> zrangebyscore key min max [WITHSCORES] [LIMIT offset count]
(分页获取指定区间内(min - max),带有分数值(可选)的有序集成员的列表。)
redisson_delay_queue_timeout:{delay_shard_queue_0} 是一个zset,如果有延时数据存入该队列时,就会在此队列中插入,排序分数为延时的时间戳。
zrangebyscore就是取出前100条过了当前时间的数据。如果取的是0的话就执行后面的zrange, 这里程序刚启动肯定是0(除非有之前的队列数据没有取完)。这样取数据就是为了把上次进程宕机后没发完的数据发完。
1612328453.280612 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{delay_shard_queue_0}" "0" "1612328453270" "limit" "0" "100"

ZANGE:取出第1个数,判断上面的是否还有下一页数据
1612328453.280704 [0 lua] "zrange" "redisson_delay_queue_timeout:{delay_shard_queue_0}" "0" "0" "WITHSCORES"

BLPOP
移出并获取delay_shard_queue_0列表的第一个元素, 如果没有元素会阻塞列表直到等待超时或发现可弹出元素为止
1612328453.289340 [0 127.0.0.1:49530] "BLPOP" "delay_shard_queue_0" "1000000"
offer写数据主要命令分析:

lua脚本参数说明:
KEYS[4]:channelName
ARGV[1]:timeout
ARGV[2]:randomId
ARGV[3]:encode(e)
对redisson_delay_queue_timeout的zset添加一个结构体,其score为timeout值;对delay_shard_queue_0的list的表尾添加结构体;然后判断zset的第一个元素是否是当前的结构体,如果是则对channel发布timeout消息
1612332536.515379 [0 127.0.0.1:55705] "EVAL" "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);redis.call('zadd', KEYS[2], ARGV[1], value);redis.call('rpush', KEYS[3], value);local v = redis.call('zrange', KEYS[2], 0, 0); if v[1] == value then redis.call('publish', KEYS[4], ARGV[1]); end;" "4" "delay_shard_queue_0" "redisson_delay_queue_timeout:{delay_shard_queue_0}" "redisson_delay_queue:{delay_shard_queue_0}" "redisson_delay_queue_channel:{delay_shard_queue_0}" "1612332539440" "4436596809899226408" "{\"@class\":\"com.ksyun.RedisDelayedQueueTest$Message\",\"data\":\"3\",\"id\":\"1612332536439\"}"

ZADD:zset里面设置数据截止的时间戳(当前执行的时间戳+延时的时间毫秒值)
1612332536.515509 [0 lua] "zadd" "redisson_delay_queue_timeout:{delay_shard_queue_0}" "1612332539440" "[\xd6p\xa5\xfa\xc8\xceCT\x00\x00\x00{\"@class\":\"com.ksyun.RedisDelayedQueueTest$Message\",\"data\":\"3\",\"id\":\"1612332536439\"}"

rpush:写数据到redisson对应的延迟目标队列
1612332536.515564 [0 lua] "rpush" "redisson_delay_queue:{delay_shard_queue_0}" "[\xd6p\xa5\xfa\xc8\xceCT\x00\x00\x00{\"@class\":\"com.ksyun.RedisDelayedQueueTest$Message\",\"data\":\"3\",\"id\":\"1612332536439\"}"

zrange:取出排序好的第一个数据,也就是最临近要触发的数据
1612332536.515597 [0 lua] "zrange" "redisson_delay_queue_timeout:{delay_shard_queue_0}" "0" "0"

publish:,发送通知给所有订阅了的客户端,内容为将要触发的时间。客户端收到通知后,在自己进程里面开启延时任务(HashedWheelTimer(netty的一个时间轮算法实现)),到时间后就可以从redis取数据发送
1612332536.515612 [0 lua] "publish" "redisson_delay_queue_channel:{delay_shard_queue_0}" "1612332539440"
由客户端进程里面的延时任务执行的,延时任务上一步构建的(原理就是通过redis的pub和sub模式)

zrangebyscore:取出redisson_delay_queue_timeout前100条延时到期的数据
1612333362.030608 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{delay_shard_queue_0}" "0" "1612333362021" "limit" "0" "100"

rpush:到期后同步一份数据到delay_shard_queue_0阻塞队列,因为我们程序已经监听了这个阻塞队列
1612332539.519497 [0 lua] "rpush" "delay_shard_queue_0" "{\"@class\":\"com.ksyun.RedisDelayedQueueTest$Message\",\"data\":\"3\",\"id\":\"1612332536439\"}"


lrem + zrem:删除掉取到的数据
1612332539.519534 [0 lua] "lrem" "redisson_delay_queue:{delay_shard_queue_0}" "1" "[\xd6p\xa5\xfa\xc8\xceCT\x00\x00\x00{\"@class\":\"com.ksyun.RedisDelayedQueueTest$Message\",\"data\":\"3\",\"id\":\"1612332536439\"}"

1612332539.519580 [0 lua] "zrem" "redisson_delay_queue_timeout:{delay_shard_queue_0}" "[\xd6p\xa5\xfa\xc8\xceCT\x00\x00\x00{\"@class\":\"com.ksyun.RedisDelayedQueueTest$Message\",\"data\":\"3\",\"id\":\"1612332536439\"}"

zrange取zset第一个数据,如果有数据重复上面的流程
1612332539.519628 [0 lua] "zrange" "redisson_delay_queue_timeout:{delay_shard_queue_0}" "0" "0" "WITHSCORES"

BLPOP:监听延迟队列消息
1612333362.038837 [0 127.0.0.1:63762] "BLPOP" "delay_shard_queue_0" "1"

深入分析结果

官方修复相关issues:https://github.com/redisson/redisson/issues/3302

新版本修改了org.redisson.command.RedisExecutor这个类

image-20210203143359923.png
前通过升级 redisson->3.15.0的最新版本,解决了此问题,通过分析,最终原因如下
1. blpop等待超时或者take都有风险,如果不升级版本建议用poll(),然后起个定时任务去poll
2. 官方的解决方法分析说明:
那个加一秒那个类 做了个临界条件的规避防止出现了延时时间和阻塞超时时间刚好撞到了一起,等待超时断开连接的那一刻有可能服务端的延迟消息到达了造成数据丢失,这个解决也是很巧秒

新旧版本对比

image-20210203145024733.png

官方版本修复变更说明:

https://github.com/redisson/redisson/releases

image-20210203143611938.png

redisson队列原理总结

上面总共使用了三个结构来存储,一个是目标队列list;一个是原生队列list,添加的是带有延时信息的结构体;一个是timeoutSetName的zset,元素是结构体,其score为timeout值

上一篇下一篇

猜你喜欢

热点阅读