RocketMQ学习-消息发布和订阅
前面一篇文章分析了broker的启动过程,浏览了broker的基本功能。接下来的几篇文章,准备按照十分钟入门RocketMQ一文中提到的一系列特性,依次进行学习。这篇文章准备分析RocketMQ作为MQ的最基本的功能:消息的发布(publish)和订阅(subscribe)。首先,我参考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和监控这篇文章完成了一个简单的例子。
一、RocketMQ消息模型
屏幕快照 2018-03-31 14.50.41.png在部署RocketMQ的时候,先启动name server,再启动broker,这时候broker会将自己注册到name server。应用程序中的producer启动的时候,首先连接一台name server,获取broker的地址列表;然后再和broker建立连接,接下来就可以发送消息了。其中:一个producer只与一个name server连接,一个producer会跟所有broker建立连接,每个连接都会有心跳检测机制。
producer会轮询向指定topic的mq集合发送消息。
consumer有两种消费模式:集群消费和广播消费。集群消费:多个consumer平均消费该topic下所有mq的消息,即某个消息在某个message queue中被一个consumer消费后,其他消费者就不会消费到它;广播消费:所有consumer可以消费到发到这个topic下的所有消息。
consumer有两种获取消息的模式:推模式和拉模式,在RocketMQ中,从技术实现角度看,推模式也是在拉模式上做了一层封装。
二、消息发送
生产者Demo
首先给出代码,
package com.javadu.chapter8rocketmq.message;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import javax.annotation.PostConstruct;
/**
* 作用: 同步发送消息
* User: duqi
* Date: 2018/3/29
* Time: 13:52
*/
@Component
public class ProducerDemo {
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQProducer() {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
try {
defaultMQProducer.start();
Message message = new Message("TopicTest", "TagA",
"Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
for (int i = 0; i < 100; i++) {
SendResult sendResult = defaultMQProducer.send(message);
System.out.println("发送消息结果, msgId:" + sendResult.getMsgId() +
", 发送状态:" + sendResult.getSendStatus());
}
} catch (MQClientException | UnsupportedEncodingException | InterruptedException
| RemotingException | MQBrokerException e) {
e.printStackTrace();
} finally {
defaultMQProducer.shutdown();
}
}
}
生产者中有两个属性:
- name server的地址,用于获得broker的相关信息
- 生产者集合producerGroup,在同一个producer group中有不同的producer实例,如果最早一个producer奔溃,则broker会通知该组内的其他producer实例进行事务提交或回滚。
RocketMQ中的消息,使用Message表示,代码定义如下:
public class Message implements Serializable {
private static final long serialVersionUID = 8445773977080406428L;
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
public Message() {
}
//省略了getter和setter方法
}
- topic:该消息将要往哪个topic发
- flag:可以用作消息过滤
- properties:扩展字段,可以做一些系统级别的通用值的透传,例如skywalking的segmentId
- body:消息内容
每个消息发送完后,会得到一个SendResult对象,看下该对象的结构:
public class SendResult {
//发送状态
private SendStatus sendStatus;
//消息ID,用于消息去重、消息跟踪
private String msgId;
private MessageQueue messageQueue;
private long queueOffset;
//事务ID
private String transactionId;
private String offsetMsgId;
private String regionId;
//是否需要跟踪
private boolean traceOn = true;
public SendResult() {
}
//省略了构造函数、getter和setter等一系列方法
}
在这个demo中,我们是将消息内容和消息状态一并打印到控制台。
消息发送源码分析
在RocketMQ中的client模块的包结构如下,可以看出,作者并没有将接口的定义和实现放在一个包下(这在我们的业务应用中是常见的做法,不一定合理)。producer和consumer包下分别定义了生产者和消费者的接口,将具体的实现放在impl包中。
屏幕快照 2018-03-31 11.51.36.png首先关注producer包里的内容,几个主要的类如下:DefaultMQProducer是生产者的默认实现、MQAdmin用于定义一些管理接口、MQProducer用于定义一些生产者特有的接口。
MQProducer.png在ProducerDemo中,通过`defaultMQProducer.start();启动生产者,接下来看下start()方法的过程:
- 根据服务状态决定接下来的动作
- 对于CREATE_JUST状态
- 设置服务状态
- 检查配置
- 获取或创建MQClientInstance实例
- 将生产者注册到指定的producerGroup,即producerTable这个数据结构中,是一个map
- 填充topicPublishInfoTable数据结构
- 启动生产者
- 对于RUNNING、START_FAILED和SHUTDOWN_ALREADY,抛出异常
public void start(final boolean startFactory) throws MQClientException {
//根据当前的服务状态决定接下来的动作
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
//创建一个客户端工厂
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//将生产者注册到指定producer group
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//填充topicPublishInfoTable
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//给该producer连接的所有broker发送心跳消息
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
顺着mQClientFactory.start()
往下跟,可以进一步了解生产者的细节,主要步骤有:
- 建立请求响应通道
- 启动各种定时任务,例如:每隔2分钟向name server拉取一次broker集群的地址,这意味着如果某个broker宕机了,生产者在这两分钟之内的消息是投递失败的;定期从name server拉取topic等路由信息;定期清理失效的broker以及向broker发送心跳消息等。
- 启动拉服务、负载均衡服务、推服务等服务,这三个服务跟消费者有关。这里设计上不太明了,将消费者和生产者的启动逻辑放在一起了。看pullMessageService和rebalanceService和初始化,它们是根据MQClientInstance初始化的,而MQClientInstance又是根据ClientConfig来配置的。
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
生产者启动后,接下来看下消息的发送过程,如下图所示,DefaultMQProducer提供了很多发送消息的方法,可以实现同步发消息、异步发消息、指定消息队列、OneWay消息、事务消息等。
屏幕快照 2018-03-31 12.26.32.png这里我们只看最简单的send(Message message)
方法,最终在DefaultMQProducerImpl中实现:
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//确认生产者状态正常
this.makeSureStateOK();
//检查消息的合法性
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
//获取消息的目的地:Topic信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
//计算出消息的投递次数,如果是同步投递,则是1+重试次数,如果不是同步投递,则只需要投递一次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
//一个broker集群有不同的broker节点,lastBrokerName记录了上次投递的broker节点,每个broker节点
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//选择一个要发送的消息队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
//投递消息
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
//根据消息发送模式,对消息发送结果做不同的处理
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
发送消息的主要过程如下:
- 首先检查生产者和消息的合法性
- 然后获取消息发送的信息,该信息存放在TopicPublishInfo对象中:
public class TopicPublishInfo {
//是否顺序消息
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
//维护该topic下用于的消息队列列表
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
//计算下一次该投递的队列,这里应用ThreadLocal,即使是同一台机器中,每个producer实例都有自己的队列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
//省略了getter和setter方法
//选择指定lastBrokerName上的下一个mq
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
//选择当前broker节点的下一个mq
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
}
- 选择要发送给该topic下的那个MessageQueue,选择的逻辑分两种情况:(1)默认情况,在上次投递的broker节点上,轮询到下一个message queue来发送;(2)sendLatencyFaultEnable这个值设置为true的时候,这块没太看懂。
- 投递消息
- 根据消息队列运行模式,针对投递结果做不同的处理。
二、消息消费
消费者Demo
消费者里有个属性需要看下:
- consumerGroup:位于同一个consumerGroup中的consumer实例和producerGroup中的各个produer实例承担的角色类似;consumerGroup中的实例还可以实现负载均衡和容灾。PS:处于同一个consumerGroup里的consumer实例一定是订阅了同一个topic。
- nameServer的地址:name server地址,用于获取broker、topic信息
消费者Demo里做了以下几个事情:
- 设置配置属性
- 设置订阅的topic,可以指定tag
- 设置第一次启动的时候,从message queue的哪里开始消费
- 设置消息处理器
- 启动消费者
package com.javadu.chapter8rocketmq.message;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* 作用:
* User: duqi
* Date: 2018/3/29
* Time: 14:00
*/
@Component
public class ConsumerDemo {
/**
* 消费者的组名
*/
@Value("${apache.rocketmq.consumer.consumerGroup}")
private String consumerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() {
//消费者的组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr(namesrvAddr);
try {
//订阅PushTopic下Tag为push的消息
consumer.subscribe("TopicTest", "TagA");
//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
//如果非第一次启动,那么按照上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
//输出消息内容
System.out.println("messageExt: " + messageExt);
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
//输出消息内容
System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);
}
} catch (Exception e) {
e.printStackTrace();
//稍后再试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者源码分析
前面分析过了,RocketMQ中的client模块统一提供了生产者和消费者客户端,这块我们看下消费者的几个主要的类。前面提到过,RocketMQ实际上都是拉模式,这里的DefaultMQPushConsumer实现了推模式,也只是对拉消息服务做了一层封装,即拉到消息的时候触发业务消费者注册到这里的callback,而具体拉消息的服务是由PullMessageService实现的,这个细节后续再研究。
MQConsumer.png在ConsumerDemo中,设置好配置信息后,会进行topic订阅,调用了DefaultMQPushConsumer的subscribe方法,源码如下:
/**
* Subscribe a topic to consuming subscription.
*
* @param topic topic to subscribe.
* @param subExpression subscription expression.it only support or operation such as "tag1 || tag2 || tag3" <br>
* if null or * expression,meaning subscribe all
* @throws MQClientException if there is any client error.
*/
@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
}
第一个参数是topic信息,第二个参数用于用于消息过滤tag字段。真正的订阅发生在DefaultMQPushConsumerImpl中,代码如下:
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
//构建包含订阅信息的对象,并放入负载平衡组件维护的map中,以topic为key
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
//如果已经跟broker集群建立连接,则给所有的broker节点发送心跳消息
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
在ConsumerDemo中,接下里会设置消费者首次启动时消费消息的起始位置,这涉及到DefaultMQPushConsumer中的一个属性——consumeFromWhere,这个值有三个可能的值
- CONSUME_FROM_LAST_OFFSET,默认值,表示从上次停止时的地方开始消费
- CONSUME_FROM_FIRST_OFFSET,从队列的头部开始消费
- CONSUME_FROM_TIMESTAMP,从指定的时间点开始消费
ConsumerDemo接下来会注册一个callback,当消息到达的时候就处理消息(最新的消息监听者支持并发消费):
/**
* Register a callback to execute on message arrival for concurrent consuming.
*
* @param messageListener message handling callback.
*/
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener) {
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
}
最后,我们看下ConsumerDemo的启动过程,即DefaultMQPushConsumerImpl的start方法,主要做了下面几件事:
- 检查配置
- 将订阅信息拷贝到负载均衡组件(rebalanceImpl)中;
- 负载均衡组件的几个属性的设置
- 处理不同消息模式(集群模式或广播模式)的配置
- 处理顺序消费和并发消费的不同配置
- 将消费者信息和consumer group注册到MQ客户端实例的consumerTable中
- 启动消费者客户端