rocketMq broker对producer的处理
broker在rocketMq中是进行处理producer发送消息请求,consumer消费消息的请求,并且进行消息的持久化,以及HA策略和服务端过滤,就是集群中很重的工作都是交给了broker进行处理。
Broker模块是通过BrokerStartup进行启动的,会实例化BrokerController,并且调用其初始化方法
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
boolean initResult = controller.initialize();
在controller.initialize()中根据配置创建了很多后台线程池(发送消息、拉取消息、查询消息、客户端管理和消费者管理),还启动了很多定时任务(consumerOffset定时每隔10秒存储)用来在后台进行定时处理任务,还注册了很多请求处理器(发送消息,拉取消息和查询消息等),通过这些处理器来处理客户端的请求,broker在接收到client端的请求后会封装成RequestTask,broker根据不同的请求code交给相应的processor在线程池中进行处理。如果broker是slave的话还会进行主从同步操作。
this.registerBrokerAll(true, false);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false);
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
在创建好相应的后台线程以及相应的请求处理器之后,broker还会生成一个定时任务,每隔30秒向NameServer注册broker的相应信息,这里也就是broker和nameServer之间的信息交流。
接下来就可以看看broker端是如何处理producer发送过来的发送消息请求:上面可以看到broker接收到请求后是交给SendMessageProcessor处理器进行处理,是在线程池进行处理的。
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
}
可以看到这里在真正处理消息发送请求的前后执行了钩子(hook),这个钩子可以用来进行程序的扩展,感觉这个挺好的,像消息链路追踪什么的都可以。
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
这里在处理发送消息请求的时候会先把消息进行存储,然后再返回请求处理结果。消息存储是交给DefaultMessageStore进行处理:
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
//处理刷盘
handleDiskFlush(result, putMessageResult, msg);
//处理高可用,主从同步的
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
这里可以看到broker对于发送消息请求的处理步骤:
- 如果是延时消息会先设置topic为SCHEDULE_TOPIC和queueId
- 获取MappedByteBuffer
- 这里有并发控制,先获取锁权限(自旋锁),然后才是进行把消息写到MappedButeBuffer中,写成功就往下进行处理,否则失败的话直接结束返回请求处理结果,最后必须得释放自旋锁。
- 处理刷盘和主从同步处理。
这里来看下消息刷盘的逻辑:
public CommitLog(DefaultMessageStore defaultMessageStore) {
//同步刷盘方式是交给GroupCommitService进行刷盘操作
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
//异步刷盘是交给FlushRealTimeService进行处理
this.flushCommitLogService = new FlushRealTimeService();
}
}
//刷盘处理
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
//同步刷盘
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
//这个在Message实例化的时候就已经被设置为true了
if (messageExt.isWaitStoreMsgOK()) {
//刷盘请求会被封装为GroupCommitRequest交给GroupCommitService进行处理
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//等待刷盘完成
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
service.wakeup();
}
}
// Asynchronous flush
//异步刷盘
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
}
}
这里可以看到刷盘的大致策略:
- 在CommitLog实例化的时候会根据broker配置生成对应的刷盘类,rocketMq把刷盘操作交给了GroupCommitService类和FlushRealTimeService类来具体进行处理,分别用于同步刷盘和异步刷盘。
- 同步刷盘会构造一个GroupCommitRequest提交到GroupCommitService的写链表中等待写入,每个GroupCommitRequest在构造的时候都有一个闭锁来进行线程执行的控制
- request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()),等待刷盘完成,这里是用到了闭锁的await方法,并且还能控制等待时间,感觉这里也就是为什么要在每个GroupCommitRequest中添加一个闭锁的原因。
- 对于异步刷盘的话是唤醒刷盘线程,FlushRealTimeService刷盘线程它的run实现中是wait了,所以这里进行一个wakeup进行唤醒刷盘线程
//通过使用request中的闭锁来阻塞当前线程等待刷盘完成,当然也有超时限定的
public boolean waitForFlush(long timeout) {
try {
this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return this.flushOK;
} catch (InterruptedException e) {
log.error("Interrupted", e);
return false;
}
}
//刷盘线程不停地执行刷盘操作
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
synchronized (this) {
this.swapRequests();
}
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end");
}
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
for (int i = 0; i < 2 && !flushOK; i++) {
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
CommitLog.this.mappedFileQueue.flush(0);
}
}
req.wakeupCustomer(flushOK);
}
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}
//设置刷盘结果,释放闭锁
public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK;
this.countDownLatch.countDown();
}
在同步刷盘中,调用waitForFlush方法,这里使用GroupCommitRequest方法的闭锁来阻塞当前线程知道刷盘操作完成或者刷盘超时,具体闭锁在哪里进行释放的呢,这得在同步刷盘是交给哪个线程处理的。
在GroupCommitService中可以看到,刷盘线程在不停地进行刷盘操作,不停地从requestsWrite中取出刷盘请求进行处理,具体是调用dommit方法进行刷盘操作,刷盘完成进行闭锁的释放。
接下来看看rocketMq对定时消息的处理:
BrokerController
public void start() throws Exception {
if (this.messageStore != null) {
this.messageStore.start();
}
}
DefaultMessageStore
public void start() throws Exception {
if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
this.scheduleMessageService.start();
}
}
public void start() {
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
可以看到BrokerController在start的时候就开启了一个后台线程用于处理延时消息,延时消息最终的处理是交给了ScheduleMessageService进行处理,它里面缓存了每条延时消息的延时级别和消息在MessageQueue中的offset信息(每条消息的逻辑位置),然后从磁盘里面找到相应的消息进行定时处理
接下来看下rocketMq对事务消息的控制:在rocketMq中对于事务消息的支持是采用两阶段提交的形式来支持的,先提交一个prepare消息到broker,然后在执行本地业务,然后再发送确认消息(commit或rollback)。
本地事务状态
public enum LocalTransactionState {
COMMIT_MESSAGE,
ROLLBACK_MESSAGE,
UNKNOW,
}
本地业务在执行完成后再发送一个消息给broker,告诉他这条消息的处理策略,是commit还是rollback。
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg)
throws MQClientException {
if (null == tranExecuter) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
//设置消息属性,指定这条消息是一条prepare消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//进行prepare消息发送
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
//根据prepare消息发送结果来进行决策本地业务是否执行
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
//在发送事务消息的时候,会回调实现了LocalTransactionExecuter这个类,作为参数传了进来
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//进行事务的完结,发送一个确认消息给broker
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
- 第一阶段先发送一个prepare消息给broker
- 根据prepare消息的发送结果来进一步处理,发送成功会进行回调实现了LocalTransactionExecuter类的方法,这是作为发送事务消息的参数传进来的
- 第二阶段发送一个确认消息给broker,commit或rollback。
1. Producer向Broker发送1条类型为TransactionPreparedType的消息,Broker接收消息保存在CommitLog中,然后返回消息的queueOffset和MessageId到Producer,MessageId包含有commitLogOffset(即消息在CommitLog中的偏移量,通过该变量可以直接定位到消息本身),由于该类型的消息在保存的时候,commitLogOffset没有被保存到consumerQueue中,此时客户端通过consumerQueue取不到commitLogOffset,所以该类型的消息无法被取到,导致不会被消费。
2. Producer端的TransactionExecuterImpl执行本地操作,返回本地事务的状态,然后发送一条类型为TransactionCommitType或者TransactionRollbackType的消息到Broker确认提交或者回滚,Broker通过Request中的commitLogOffset,获取到上面状态为TransactionPreparedType的消息(简称消息A),然后重新构造一条与消息A内容相同的消息B,设置状态为TransactionCommitType或者TransactionRollbackType,然后保存。其中TransactionCommitType类型的,会放commitLogOffset到consumerQueue中,TransactionRollbackType类型的,消息体设置为空,不会放commitLogOffset到consumerQueue中.
在第一阶段发送prepare消息时,broker端还是用SendMessageProcessor来进行请求处理的,在处理的时候不会把prepare消息存到consumeQueue中去,只是会存到commitLog中
switch (tranType) {
// Prepared and Rollback message is not consumed, will not enter the
// consumer queuec
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
queueOffset = 0L;
break;
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
default:
break;
}
在发送第二阶段的确认消息时,broker端采用的是EndTransactionProcessor进行处理的,根据消息在commitLog中的位置找到相应的消息进行处理,存到consumeQueue中或者从commitLog中移除:
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());