消息机制Redis分布式服务

阿里二面:基于Redis实现延时队列服务

2022-01-23  本文已影响0人  马小莫QAQ

一、背景

在业务发展过程中,会出现一些需要延时处理的场景,比如:

  1. 订单下单之后超过30分钟用户未支付,需要取消订单
  2. 订单一些评论,如果48h用户未对商家评论,系统会自动产生一条默认评论
  3. 点我达订单下单后,超过一定时间订单未派出,需要超时取消订单等。。

处理这类需求,比较直接简单的方式就是定时任务轮训扫表。这种处理方式在数据量不大的场景下是完全没问题,但是当数据量大的时候高频的轮训数据库就会比较的耗资源,导致数据库的慢查或者查询超时。所以在处理这类需求的时候,采用了延时队列来完成。

二、几种延时队列

延时队列就是一种带有延迟功能的消息队列。下面会介绍几种目前已有的延时队列: 1.Java中
java.util.concurrent.DelayQueue
优点:JDK自身实现,使用方便,量小适用
缺点:队列消息处于jvm内存,不支持分布式运行和消息持久化 2.Rocketmq延时队列 优点:消息持久化,分布式
缺点:不支持任意时间精度,只支持特定level的延时消息 3.Rabbitmq延时队列(TTL+DLX实现) 优点:消息持久化,分布式
缺点:延时相同的消息必须扔在同一个队列

根据自身业务和公司情况,如果实现一个自己的延时队列服务需要考虑一下几点:

三、 基于Redis实现

1.0版本

每个延时消息必须包括以下参数:

注:上图1、2、3或者2、3是一个实务操作
取出过期消息过程是通过一个外部定时任务每隔1min分钟去查询队列中过期的消息,然后发送mq && remove

2.0版本

1.0上有一个可改进的地方就是队列中过期的消息是通过定时任务触发查询。所有有了2.0
2.0版本在1.0上做了一个优化,废弃掉了1min定时任务触发过期消息发送,采用了java Lock await/singlal方式实现过期消息的实时发送低延时

服务启动会注册zk,获取分配处理的queues,启动后台线程监听zk 。
为每个分配queue创建一个pull job 。
pull job首先会去queue中查询是否有过期消息:
Y:将取出消息交给worker处理
N:查询queue中最后一个成员(zset结构默认按score递增排序),如果为空,则await;不为空则await(成员
score-System.currentTimeMillis())

由于过期消息发送成功才会从队列中remove,所以pull job会记录上一次查询队列的一个offset,每次获取到过期消息会将offset向前偏移,过期消息交给worker处理,当worker由于某些异常原因处理失败会重置pull job中offset,这样可以避免消息发送一次失败之后没办法再继续处理(除了新节点add || remove时候)。
当部署服务有新增,延时队列服务会重新计算得到当前处理队列,并将之前创建pull job cancel,为新处理队列重新创建pull job。删除同理。

作者:Simple
来源:www.cnblogs.com/lylife/p/7881950.html

上一篇下一篇

猜你喜欢

热点阅读