对RocketMQ的见解以及与SpringBoot集成和配置可视
RocketMQ
RocketMQ是一个分布式消息传递和流媒体平台,是一款分布式、队列模型的消息中间件。具有低延迟,高性能和可靠性,万亿级容量和灵活的可扩展性。阿里开源的消息中间件,已经捐献给了 Apache 。
Github地址:https://github.com/apache/rocketmq/
官方样例代码:https://github.com/apache/rocketmq/tree/master/example
官方地址:https://rocketmq.apache.org/
使用文档:https://rocketmq.apache.org/docs/quick-start/
具体特点:
1.能够保证严格的消息顺序
2.提供丰富的消息拉取模式
3.高效的订阅者水平扩展能力
4.实时的消息订阅机制
5.亿级消息堆积能力
部署RocketMQ
1.下载源码
官方下载地址:https://www.apache.org/dyn/closer.cgi?path=rocketmq
官方推荐使用的镜像地址:http://mirror.bit.edu.cn/apache/rocketmq
这里选择下载了最新的RocketMQ4.3.2
的源码
2.1解压并用maven构建二进制文件
构建命令:mvn -Prelease-all -DskipTests clean install -U
RocketMQ中需要打包构建的模块已经全部完成,对应的target文件夹在distribution
模块中
2.2或者可以直接下载bin-release
解压得到二进制文件
image.png
这种方式不需要下载源码和用maven构建
3.启动
1.启动 NameServer
进入target/apache-rocketmq/bin
目录
window中命令:start mqnamesrv.cmd
linux中命令nohup sh bin/mqnamesrv &
注:出现以下提示,也就是没有在环境变量中配置ROCKETMQ_HOME
的值
解决方法:配置环境变量
变量名:ROCKETMQ_HOME
变量值:E:\rocketMQ\rocketmq-all-4.3.2\distribution\target\apache-rocketmq
(变量值不一定要跟我的完全一样,distribution\target\apache-rocketmq
这个基本是一样的)
再次执行启动命令
成功后会弹出提示框,此框勿关闭
启动BROKER
window中命令:start mqbroker.cmd -n localhost:9876
linux中命令nohup sh bin/mqbroker -n localhost:9876 &
成功后会弹出提示框,此框勿关闭
RocketMQ的核心概念图
image.pngProducer消息生产者
将业务应用程序系统生成的消息发送给Broker
,RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)
三种发送消息的方式:同步,异步和单向传输
源码解析
public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* 以同步方式发送消息。仅当发送过程完全完成时,此方法才会返回。
*/
@Override
public SendResult send(Message msg) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg);
}
/**
* 以同步方式发送消息。可指定发送超时时间
*/
@Override
public SendResult send(Message msg,long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);
}
/**
* 以异步方式发送消息。此方法立即返回。发送完成后,将执行sendCallback。
*/
@Override
public void send(Message msg,SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback);
}
/**
* 以异步方式发送消息。可指定发送超时时间
*/
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}
/**
* 以单向传输的方式发送消息。具有最大的吞吐量但存在消息丢失的风险。不等待服务器回应且
没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。
*/
@Override
public void sendOneway(Message msg) throws MQClientException,
RemotingException, InterruptedException {
this.defaultMQProducerImpl.sendOneway(msg);
}
}
关于异步发送,需要实现异步发送回调接口SendCallback
,在执行消息的异步发送时,应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理
public interface SendCallback {
void onSuccess(final SendResult sendResult);
void onException(final Throwable e);
}
详细写法推荐参考:https://help.aliyun.com/document_detail/29547.html
Producer Group生产者组
如果其中一个生产者在事务之后崩溃,则代理可以联系同一生产者组的不同生产者实例以提交或回滚事务。一个应用只有一个生产者组,避免不必要的实例初始化,每个生产者发送逻辑一致,producerGroup
可以共用一个队列
生产者端的负载均衡
生产者发送时,会自动轮询当前所有可发送的broker
,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker
上
Consumer消息消费者
消费者从Broker那里获取消息并将其提供给应用程序。
从用户应用的角度来看,提供了两种类型的消费者:
1.PullConsumer(拉模式)
概念:consumer
主动去向broker
拉取消息。
取消息的过程需要用户自己写,首先通过打算消费的Topic
拿到MessageQueue
的集合,遍历MessageQueue
集合,然后针对每个MessageQueue
批量取消息,一次取完后,记录该队列下一次要取的开始offset
,直到取完了,再换另一个MessageQueue
PullConsumer
具体使用可以参考
https://blog.csdn.net/gwd1154978352/article/details/80884741
2.PushConsumer(推模式)
概念:broker
主动去向consumer推送消息。
Push
方式里,consumer把轮询过程封装了,并注册MessageListener
监听器,取到消息后,唤醒MessageListener
的consumeMessage()
来消费,对用户而言,感觉消息是被推送过来的。
关系:PUSH模式实际上在内部还是使用的PULL
方式实现的,通过PULL
不断地轮询Broker
获取消息,当不存在新消息时,Broker
会挂起PULL
请求,直到有新消息产生才取消挂起,返回新消息
PushConsumer具体使用主要有三步
1.订阅主题以消费订阅,主要用到subscribe()
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
}
2.设置Push的消费策略,用到setConsumeFromWhere()
public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
this.consumeFromWhere = consumeFromWhere;
}
ConsumeFromWhere
是一个枚举类
public enum ConsumeFromWhere {
//默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_LAST_OFFSET,
//从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
CONSUME_FROM_FIRST_OFFSET,
//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
CONSUME_FROM_TIMESTAMP,
}
3.进行消息的逻辑处理(消费)
3.1注册MessageListener
监听器,监听器有两种,分别是
有序消费MessageListenerOrderly
并发消费MessageListenerConcurrently
MessageListenerConcurrently
和MessageListenerOrderly
均继承于 MessageListener
public interface MessageListenerConcurrently extends MessageListener {
ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeConcurrentlyContext context);
}
public interface MessageListenerOrderly extends MessageListener {
ConsumeOrderlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeOrderlyContext context);
}
区别是:
1.并发消费的消费速度要比有序消费更快。
2.并发消费模式不会无限消费,而且消费失败后不会马上再消费。
有序消费模式要慎重地处理异常,只要消费次数达到一定次数,那么就直接返回ConsumeOrderlyStatus.SUCCESS
(推荐使用并发消费)
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
@Override
public void registerMessageListener(MessageListenerOrderly messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
注:被transient
修饰的成员属性变量不被序列化
DefaultMQPushConsumerImpl
是PushConsumer
消费者具体实现类,用于设置拉取消息后的回调类
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private MessageListener messageListenerInner;
public void registerMessageListener(MessageListener messageListener) {
this.messageListenerInner = messageListener;
}
//省略部分源码
}
3.2在对应的回调类中写消息处理逻辑
简单来说就是实现MessageListenerConcurrently
或者MessageListenerOrderly
回调接口的方法
处理模板
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//具体逻辑
}
});
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
//具体逻辑
}
});
consumeMessage()
的参数说明
参数List<MessageExt> msgs
是一个消息列表
参数ConsumeConcurrentlyContext context
用于消费的事务控制,可以用于设置是否自动提交,消息队列和当前队列的暂停时间
ConsumeOrderlyContext
源码
public class ConsumeOrderlyContext {
private final MessageQueue messageQueue;
private boolean autoCommit = true;
private long suspendCurrentQueueTimeMillis = -1;
public ConsumeOrderlyContext(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
public boolean isAutoCommit() {
return autoCommit;
}
public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
}
public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
}
}
这里重点说明下返回值,consumeMessage
有两种返回值:成功消费和失败稍后重试
枚举类ConsumeConcurrentlyStatus
源码,并发消费返回值
public enum ConsumeConcurrentlyStatus {
//成功消费
CONSUME_SUCCESS,
//失败稍后重试
RECONSUME_LATER;
}
枚举类ConsumeOrderlyStatus
源码,顺序消费返回值
public enum ConsumeOrderlyStatus {
//成功消费
SUCCESS,
//失败稍后重试
SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
两者具体的实现方法可以参考
https://blog.csdn.net/qq_36804701/article/details/81481343
4.调用start()
方法启动consumer
Consumer Group消费者组
与之前提到的生产者组类似,完全相同角色的消费者被组合在一起并命名为消费者组。
Message Model消息模型
1.集群消费方式 (聚类模型)
一个ConsumerGroup中的Consumer实例平均分摊消费生产者发送的消息
例如某个Topic
有九条消息,其中一个Consumer Group
有三个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息,Consumer
不指定消费方式的话默认是集群消费的,适用于大部分消息的业务
2.广播消费方式 (广播模型)
一条消息被多个Consumer
消费,几十这些Consumer
属于同一个ConsumerGroup
,消息也会被ConsumerGroup
中的每个Consumer
消费一次,广播消费中的ConsumerGroup
概念可以认为在消息划分层面没有意义,适用于一些分发消息的场景,比如我订单下单成功了,需要通知财务系统,客服系统等等这种分发的场景,可以通过修改Consumer
中的MessageModel
来设置消费方式为广播消费
原文:https://blog.csdn.net/weixin_41098980/article/details/79880957
Broker
消息中转角色,负责存储消息,转发消息。是RocketMQ
系统的主要组成部分。接收从生产者发送的消息,存储它们并准备处理来自消费者的拉取请求。还存储与消息相关的元数据,包括消费者组,消耗进度偏移和主题/队列信息。
Broker
是具体提供业务的服务器,单个Broker节点与所有的NameServer
节点保持长连接及心跳,并会定时将Topic
信息注册到NameServer
,顺带一提底层的通信和连接都是基于Netty
实现的
NameServer
NameServer
的作用是注册中心,类似于Zookeeper
,但又有区别于它的地方。每个NameServer
节点互相之间是独立的,没有任何信息交互,也就不存在任何的选主或者主从切换之类的问题,因此NameServer
与Zookeeper
相比更轻量级
Topic
主题是生产者传递消息和消费者提取消息的类别。
主题与生产者和消费者的关系非常松散。具体来说,一个主题可能有零个,一个或多个生成器向它发送消息; 相反,制作人可以发送不同主题的消息。从消费者的角度来看,主题可以由零个,一个或多个消费者群体订阅。类似地,消费者组可以订阅一个或多个主题,只要该组的实例保持其订阅一致即可。
Tags
标记,换句话说,子主题,为用户提供了额外的灵活性。Tags
是Topic
下的次级消息类型(注:Tags
也支持TagA || TagB
这样的表达式),可以在同一个Topic
下基于Tags
进行消息过滤。对于标记,来自同一业务模块的具有不同目的的消息可以具有相同的主题和不同的标记。标签有助于保持代码的清晰和连贯,而标签也可以方便RocketMQ提供的查询系统。
RocketMQ
支持给在发送的时候给topic
打tag
,同一个topic
的消息虽然逻辑管理是一样的。但是消费topic1
的时候,如果你订阅的时候指定的是tagA
,那么tagB
的消息将不会投递。
Message信息
消息是要传递的信息。消息必须有一个主题,可以将其解释为您要发送给的邮件地址
MessageQueue
与MessageQueueSelector
消息还可以具有可选标记和额外的键 - 值对。
例如,您可以为消息设置业务密钥,并在代理服务器上查找消息以诊断开发期间的问题。
生产者的send方法不同于以往
@Override
public SendResult send(Message msg, MessageQueue mq)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, mq);
}
@Override
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, selector, arg);
}
可以指定目标消息队列MessageQueue
消息队列选择器MessageQueueSelector
(通过它可以获得自定义的目标消息队列以传递消息),Object arg
是与消息队列选择器一起使用的参数。
具体使用可以参考:https://www.jianshu.com/p/53324ea2df92
Message Queue消息队列
主题被划分为一个或多个子主题消息队列
MessageQueue
源码
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
private static final long serialVersionUID = 6191200464116433425L;
private String topic;
private String brokerName;
private int queueId;
//省略部分代码
}
关于RocketMQ——顺序消息和重复消息可参考
https://blog.csdn.net/gwd1154978352/article/details/80691916
进行与SpringBoot集成
下面写下测试代码
生产者
@Component
@Slf4j
public class RocketMQClient {
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQProducer() {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
try {
producer.start();
Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
StopWatch stop = new StopWatch();
stop.start();
for (int i = 0; i < 50; i++) {
SendResult result = producer.send(message);
log.info("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
}
stop.stop();
log.info("----------------发送50条消息耗时:" + stop.getTotalTimeMillis());
} catch (Exception e) {
log.warn("exception_content",e);
} finally {
producer.shutdown();
}
}
}
@PostConstruct
注解用来修饰一个非静态的void()
方法,而且这个方法不能有抛出异常声明。在服务器加载Servlet
的时候运行,并且只会被服务器调用一次,会在构造函数之后,init()
方法之前运行。
拓展:
@PreDestroy
修饰的方法会在服务器卸载Servlet
的时候运行,并且只会被服务器调用一次,会在destroy()
方法之后,在Servlet被彻底卸载之前运行。
消费者
@Component
@Slf4j
public class RocketMQServer {
@Value("${apache.rocketmq.consumer.PushConsumer}")
private String consumerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() {
//消费者的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr(namesrvAddr);
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("TopicTest", "push");
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
log.info("messageExt: " + messageExt);//输出消息内容
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
log.info("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容
}
} catch (Exception e) {
log.warn("exception_content",e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
});
consumer.start();
} catch (Exception e) {
log.warn("exception_content",e);
}
}
}
application.yml配置文件
apache:
rocketmq:
# 消费者的组名
consumer:
PushConsumer: PushConsumer
# 生产者的组名
producer:
producerGroup: Producer
# NameServer地址
namesrvAddr: localhost:9876
多个namesrvAddr
用;
隔开
代码参考于:http://www.54tianzhisheng.cn/2018/02/07/SpringBoot-RocketMQ/#%E5%90%AF%E5%8A%A8-Name-Server
启动测试,消息成功发送和消费
image.pngRocketMQ可视化监控界面
来源于RocketMQ有一个对其扩展的开源项目rocketmq-externals
Github地址:https://github.com/apache/rocketmq-externals
子模块rocketmq-console
便是RocketMQ
管理控制台项目。
官网说明对rocketmq-console
的描述是A newly designed RocketMQ's console using spring-boot
搭建过程
1.把整个项目拉下来,进入rocketmq-console
,找到application.properties
,修改下配置
设置好rocketmq-console
的启动端口和需要监听的RocketMQ服务端的ip地址和端口号
2.用maven构建rocketmq-console
,执行命令:mvn clean package -Dmaven.test.skip=true
3.执行生成的jar
包
编译成功之后,Cmd
进入target
文件夹,执行java -jar rocketmq-console-ng-1.0.0.jar