RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,
1 RocketMQ使用相关问题
1.1 保证消息的可用性/可靠性/不丢失
消息可能在哪些阶段丢失呢?可能会在这三个阶段发生丢失:生产阶段
、存储阶段
、消费阶段
所以要从这三个阶段考虑:
image.png
1.1.1 生产
在生产阶段,主要通过请求确认机制,来保证消息的可靠传递。
- 同步发送的时候,要注意处理响应结果和异常。如果返回响应
OK
,表示消息成功发送到了Broker
,如果响应失败,或者发生其它异常,都应该重试。 - 异步发送的时候,应该在回调方法里检查,如果发送失败或者异常,都应该进行重试。
- 如果发生超时的情况,也可以通过查询日志的API,来检查是否在
Broker
存储成功。
1.1.2 存储
存储阶段,可以通过配置可靠性优先的 Broker
参数来避免因为宕机丢消息,简单说就是可靠性优先的场景都应该使用同步
- 消息只要持久化到
CommitLog
(日志文件)中,即使Broker
宕机,未消费的消息也能重新恢复再消费。 -
Broker
的刷盘机制:同步刷盘和异步刷盘,不管哪种刷盘都可以保证消息一定存储在pagecache
中(内存中),但是同步刷盘更可靠,它是Producer
发送消息后等数据持久化到磁盘之后再返回响应给Producer
image.png
-
Broker
通过主从模式来保证高可用,Broker
支持Master
和Slave
同步复制、Master
和Slave
异步复制模式,生产者的消息都是发送给Master
,但是消费既可以从Master
消费,也可以从Slave
消费。同步复制模式可以保证即使Master
宕机,消息肯定在Slave
中有备份,保证了消息不会丢失。
1.1.3 消费
从Consumer
角度分析,如何保证消息被成功消费?
Consumer
保证消息成功消费的关键在于确认的时机,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。
1.2 如何处理消息重复消费
对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,就是所谓的有且仅有一次
。RocketMQ
择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
处理消息重复问题,主要有业务端自己保证,主要的方式有两种:业务幂等
和消息去重
-
业务幂等
:第一种是保证消费逻辑的幂等性,也就是多次调用和一次调用的效果是一样的。这样一来,不管消息消费多少次,对业务都没有影响。 -
消息去重
:第二种是业务端,对重复的消息就不再消费了。这种方法,需要保证每条消息都有一个惟一的编号,通常是业务相关的,比如订单号,消费的记录需要落库,而且需要保证和消息确认这一步的原子性
具体做法是可以建立一个消费记录表
,拿到这个消息做数据库的insert操作。给这个消息做一个唯一主键(primary key)或者唯一约束,那么就算出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。
1.3 怎么处理消息积压
发生了消息积压,这时候就得想办法赶紧把积压的消息消费完,就得考虑提高消费能力,一般有两种办法:
image.png
-
消费者扩容
:如果当前Topic
的Message Queue
的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。 -
消息迁移Queue扩容
:如果当前Topic的Message Queue
的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容Message Queue
。可以新建一个临时的Topic
,临时的Topic
多设置一些Message Queue
,然后先用一些消费者把消费的数据丢到临时的Topic
,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的Topic
里的数据,消费完了之后,恢复原状。
image.png
1.4 顺序消息如何实现
顺序消息是指消息的消费顺序
和产生顺序
相同,在有些业务逻辑下,必须保证顺序,比如订单的生成、付款、发货,这个消息必须按顺序处理才行。
顺序消息分为全局顺序消息
和部分顺序消息
:
- 全局顺序消息指某个
Topic
下的所有消息都要保证顺序; - 部分顺序消息只要保证每一组消息被顺序消费即可,比如订单消息,只要保证同一个订单 ID 个消息能按顺序消费即可。
1.4.1 部分顺序消息
部分顺序消息相对比较好实现,生产端需要做到把同 ID
的消息发送到同一个 Message Queue
;在消费过程中,要做到从同一个Message Queue
读取的消息顺序处理——消费端不能并发处理顺序消息,这样才能达到部分有序。
发送端使用 MessageQueueSelector
类来控制 把消息发往哪个 Message Queue
import lombok.Data;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags= new String[]{"TagA","TagC","TagD"};
//订单列表
List<OrderStep> orderList = new Producer().buildOrders();
Date date= new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for(int i=0;i<10;i++){
//加时间前缀
String body= dateStr+"Hello RocketMQ"+orderList.get(i);
Message msg = new Message("TopicTest",tags[i % tags.length],"KEY"+i,body.getBytes());
SendResult sendResult = producer.send(msg,new MessageQueueSelector(){
@Override
public MessageQueue select(List<MessageQueue> mqs,Message msg,Object arg){
Long id = (Long)arg;
long index = id%mqs.size();
return mqs.get((int)index);
}
},orderList.get(i).getOrderId());
System.out.println(String.format("SendResult status:%s,queueId:%d,body:%s",sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
@Data
private static class OrderStep{
private long orderId;
private String desc;
}
private List<OrderStep> buildOrders(){
List<OrderStep> orderList = new ArrayList<>();
OrderStep order = new OrderStep();
order.setOrderId(1);
order.setDesc("创建");
orderList.add(order);
order = new OrderStep();
order.setOrderId(2);
order.setDesc("创建");
orderList.add(order);
order = new OrderStep();
order.setOrderId(3);
order.setDesc("创建");
orderList.add(order);
order = new OrderStep();
order.setOrderId(4);
order.setDesc("创建");
orderList.add(order);
return orderList;
}
}
消费端通过使用 MessageListenerOrderly
来解决单 Message Queue
的消息被并发处理的问题
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class ConsumerInOrder {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/*
* 设置consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果不是第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest","TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for(MessageExt msg : list){
//可以看到每一个queue有一个consume线程来消费,订单对每个queue(分区)有序
System.out.println("consumeThread = "+ Thread.currentThread().getName()+"queueId="+
msg.getQueueId()+",content:"+new String(msg.getBody()));
}
try{
TimeUnit.SECONDS.sleep(random.nextInt(10));
}catch (Exception e){
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
1.4.2 全局顺序消息
RocketMQ
默认情况下不保证顺序,比如创建一个 Topic
,默认八个写队列,八个读队列,这时候一条消息可能被写入任意一个队列里;在数据的读取过程中,可能有多个 Consumer
,每个 Consumer
也可能启动多个线程并行处理,所以消息被哪个 Consumer
消费,被消费的顺序和写人的顺序是否一致是不确定的。
要保证全局顺序消息, 需要先把 Topic
的读写队列数设置为 一,然后Producer Consumer
的并发设置,也要是一。简单来说,为了保证整个 Topic
全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 ,这时候就完全牺牲RocketMQ
的高并发、高吞吐的特性了。
1.5 如何实现消息过滤
有两种方案:
- 在
Broker
端按照Consumer
的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到Consumer
端,缺点是加重了Broker
的负担,实现起来相对复杂。 - 在
Consumer
端过滤,比如按照消息设置的tag
去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了Consumer
端只能丢弃不处理。
一般采用Cosumer
端过滤,如果希望提高吞吐量,可以采用Broker
过滤。
对消息的过滤有三种方式:
- 根据
Tag
过滤
这是最常见的一种,用起来高效简单
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
-
SQL
表达式过滤
SQL
表达式过滤更加灵活
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
-
Filter Server
方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤
1.6 延时消息了解
电商的订单超时自动取消,就是一个典型的利用延时消息的例子,用户提交了一个订单,就可以发送一个延时消息,1h
后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
RocketMQ
是支持延时消息的,只需要在生产消息的时候设置消息的延时级别:
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 启动生产者
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
}
但是目前RocketMQ
支持的延时级别是有限的:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
那么RocketMQ
怎么实现延时消息的
简单,八个字:临时存储
+定时任务
Broker
收到延时消息了,会先发送到主题(SCHEDULE_TOPIC_XXXX
)的相应时间段的Message Queue
中,然后通过一个定时任务轮询这些队列,到期后,把消息投递到目标Topic
的队列中,然后消费者就可以正常消费这些消息。
1.7 怎么实现分布式消息事务或者半消息
半消息
:是指暂时还不能被Consumer
消费的消息,Producer
成功发送到 Broker
端的消息,但是此消息被标记为 暂不可投递
状态,只有等 Producer
端执行完本地事务后经过二次确认了之后,Consumer
才能消费此条消息。
依赖半消息,可以实现分布式消息事务,其中的关键在于二次确认以及消息回查:
image.png
RocketMQ
实现消息事务 :
-
Producer
向broker
发送半消息 -
Producer
端收到响应,消息发送成功,此时消息是半消息,标记为不可投递
状态,Consumer
消费不了。 -
Producer
端执行本地事务。 - 正常情况本地事务执行完成,
Producer
向Broker
发送Commit/Rollback
,如果是Commit
,Broker
端将半消息标记为正常消息,Consumer
可以消费,如果是Rollback
,Broker
丢弃此消息。 - 异常情况,
Broker
端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到Producer
端查询半消息的执行情况。 -
Producer
端查询本地事务的状态 - 根据事务的状态提交
commit/rollback
到broker
端。(5,6,7 是消息回查) - 消费者端消费到消息之后,执行本地事务
1.8 死信队列
死信队列
用于处理无法被正常消费的消息,即死信消息
当一条消息初次消费失败,消息队列 RocketMQ
会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ
不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中,该特殊队列称为死信队列
死信消息的特点:
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,需要在死信消息产生后的 3 天内及时处理。
死信队列的特点:
- 一个死信队列对应一个
Group ID
, 而不是对应单个消费者实例。 - 如果一个
Group ID
未产生死信消息,消息队列RocketMQ
不会为其创建相应的死信队列。 - 一个死信队列包含了对应
Group ID
产生的所有死信消息,不论该消息属于哪个Topic
RocketMQ
控制台提供对死信消息的查询、导出和重发的功能。
1.9 如何保证RocketMQ的高可用
NameServer
因为是无状态,且不相互通信的,所以只要集群部署就可以保证高可用。
RocketMQ
的高可用主要是在体现在Broker
的读和写的高可用,Broker
的高可用是通过集群和主从实现的。
Broker
可以配置两种角色:Master
和Slave
,Master
角色的Broker
支持读和写
,Slave
角色的Broker
只支持读
,Master
会向Slave
同步消息。
也就是说Producer
只能向Master
角色的Broker
写入消息,Cosumer
可以从Master
和Slave
角色的Broker
读取消息。
Consumer
的配置文件中,并不需要设置是从 Master
读还是从 Slave
读,当 Master
不可用或者繁忙的时候, Consumer
的读请求会被自动切换到从 Slave
。有了自动切换 Consumer
这种机制,当一个 Master
角色的机器出现故障后,Consumer
仍然可以从 Slave
读取消息,不影响 Consumer
读取消息,这就实现了读的高可用。
如何达到发送端写的高可用性
- 在创建
Topic
的时候,把Topic
的多个Message Queue
创建在多个Broker
组上(相同Broker
名称,不同brokerId
机器组成Broker
组),这样当Broker
组的Master
不可用后,其他组Master
仍然可用,Producer
仍然可以发送消息
注意
:RocketMQ
目前还不支持Broker
把Slave
自动转成Master
,如果机器资源不足,需要把 Slave
转成 Master
,则要手动停止 Slave
色的 Broker
,更改配置文件,用新的配置文件启动 Broker