rocketMq broker对producer的处理

2018-12-10  本文已影响0人  圣村的希望

  broker在rocketMq中是进行处理producer发送消息请求,consumer消费消息的请求,并且进行消息的持久化,以及HA策略和服务端过滤,就是集群中很重的工作都是交给了broker进行处理。
  Broker模块是通过BrokerStartup进行启动的,会实例化BrokerController,并且调用其初始化方法

final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);
            // remember all configs to prevent discard
            controller.getConfiguration().registerConfig(properties);

            boolean initResult = controller.initialize();

  在controller.initialize()中根据配置创建了很多后台线程池(发送消息、拉取消息、查询消息、客户端管理和消费者管理),还启动了很多定时任务(consumerOffset定时每隔10秒存储)用来在后台进行定时处理任务,还注册了很多请求处理器(发送消息,拉取消息和查询消息等),通过这些处理器来处理客户端的请求,broker在接收到client端的请求后会封装成RequestTask,broker根据不同的请求code交给相应的processor在线程池中进行处理。如果broker是slave的话还会进行主从同步操作。

this.registerBrokerAll(true, false);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false);
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);

  在创建好相应的后台线程以及相应的请求处理器之后,broker还会生成一个定时任务,每隔30秒向NameServer注册broker的相应信息,这里也就是broker和nameServer之间的信息交流。

  接下来就可以看看broker端是如何处理producer发送过来的发送消息请求:上面可以看到broker接收到请求后是交给SendMessageProcessor处理器进行处理,是在线程池进行处理的。

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
  public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
        switch (request.getCode()) {
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.consumerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return null;
                }

                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

                RemotingCommand response;
                if (requestHeader.isBatch()) {
                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
                }

                this.executeSendMessageHookAfter(response, mqtraceContext);
                return response;
        }
    }
}

  可以看到这里在真正处理消息发送请求的前后执行了钩子(hook),这个钩子可以用来进行程序的扩展,感觉这个挺好的,像消息链路追踪什么的都可以。

PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

  这里在处理发送消息请求的时候会先把消息进行存储,然后再返回请求处理结果。消息存储是交给DefaultMessageStore进行处理:

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();

        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }

                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }

        long eclipseTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);

            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }

            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }

            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            putMessageLock.unlock();
        }

        if (eclipseTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
        }

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

        //处理刷盘
        handleDiskFlush(result, putMessageResult, msg);
        //处理高可用,主从同步的
        handleHA(result, putMessageResult, msg);

        return putMessageResult;
    }

  这里可以看到broker对于发送消息请求的处理步骤:

  1. 如果是延时消息会先设置topic为SCHEDULE_TOPIC和queueId
  2. 获取MappedByteBuffer
  3. 这里有并发控制,先获取锁权限(自旋锁),然后才是进行把消息写到MappedButeBuffer中,写成功就往下进行处理,否则失败的话直接结束返回请求处理结果,最后必须得释放自旋锁。
  4. 处理刷盘和主从同步处理。

  这里来看下消息刷盘的逻辑:

public CommitLog(DefaultMessageStore defaultMessageStore) {
  //同步刷盘方式是交给GroupCommitService进行刷盘操作
  if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            this.flushCommitLogService = new GroupCommitService();
        } else {
            //异步刷盘是交给FlushRealTimeService进行处理
            this.flushCommitLogService = new FlushRealTimeService();
        }
}

//刷盘处理
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // Synchronization flush
        //同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            //这个在Message实例化的时候就已经被设置为true了
            if (messageExt.isWaitStoreMsgOK()) {
                //刷盘请求会被封装为GroupCommitRequest交给GroupCommitService进行处理
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                //等待刷盘完成
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }
        // Asynchronous flush
        //异步刷盘
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else {
                commitLogService.wakeup();
            }
        }
    }

  这里可以看到刷盘的大致策略:

  1. 在CommitLog实例化的时候会根据broker配置生成对应的刷盘类,rocketMq把刷盘操作交给了GroupCommitService类和FlushRealTimeService类来具体进行处理,分别用于同步刷盘和异步刷盘。
  2. 同步刷盘会构造一个GroupCommitRequest提交到GroupCommitService的写链表中等待写入,每个GroupCommitRequest在构造的时候都有一个闭锁来进行线程执行的控制
  3. request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()),等待刷盘完成,这里是用到了闭锁的await方法,并且还能控制等待时间,感觉这里也就是为什么要在每个GroupCommitRequest中添加一个闭锁的原因。
  4. 对于异步刷盘的话是唤醒刷盘线程,FlushRealTimeService刷盘线程它的run实现中是wait了,所以这里进行一个wakeup进行唤醒刷盘线程
//通过使用request中的闭锁来阻塞当前线程等待刷盘完成,当然也有超时限定的
public boolean waitForFlush(long timeout) {
            try {
                this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
                return this.flushOK;
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
                return false;
            }
        }

//刷盘线程不停地执行刷盘操作
public void run() {
            CommitLog.log.info(this.getServiceName() + " service started");

            while (!this.isStopped()) {
                try {
                    this.waitForRunning(10);
                    this.doCommit();
                } catch (Exception e) {
                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }

            // Under normal circumstances shutdown, wait for the arrival of the
            // request, and then flush
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                CommitLog.log.warn("GroupCommitService Exception, ", e);
            }

            synchronized (this) {
                this.swapRequests();
            }

            this.doCommit();

            CommitLog.log.info(this.getServiceName() + " service end");
        }

private void doCommit() {
            synchronized (this.requestsRead) {
                if (!this.requestsRead.isEmpty()) {
                    for (GroupCommitRequest req : this.requestsRead) {
                        // There may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushOK = false;
                        for (int i = 0; i < 2 && !flushOK; i++) {
                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();

                            if (!flushOK) {
                                CommitLog.this.mappedFileQueue.flush(0);
                            }
                        }

                        req.wakeupCustomer(flushOK);
                    }

                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                    if (storeTimestamp > 0) {
                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                    }

                    this.requestsRead.clear();
                } else {
                    // Because of individual messages is set to not sync flush, it
                    // will come to this process
                    CommitLog.this.mappedFileQueue.flush(0);
                }
            }
        }

//设置刷盘结果,释放闭锁
public void wakeupCustomer(final boolean flushOK) {
            this.flushOK = flushOK;
            this.countDownLatch.countDown();
        }

  在同步刷盘中,调用waitForFlush方法,这里使用GroupCommitRequest方法的闭锁来阻塞当前线程知道刷盘操作完成或者刷盘超时,具体闭锁在哪里进行释放的呢,这得在同步刷盘是交给哪个线程处理的。
  在GroupCommitService中可以看到,刷盘线程在不停地进行刷盘操作,不停地从requestsWrite中取出刷盘请求进行处理,具体是调用dommit方法进行刷盘操作,刷盘完成进行闭锁的释放。

  接下来看看rocketMq对定时消息的处理:

BrokerController
public void start() throws Exception {
        if (this.messageStore != null) {
            this.messageStore.start();
        }
}

DefaultMessageStore
public void start() throws Exception {
  if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
            this.scheduleMessageService.start();
        }
}

public void start() {

        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }

  可以看到BrokerController在start的时候就开启了一个后台线程用于处理延时消息,延时消息最终的处理是交给了ScheduleMessageService进行处理,它里面缓存了每条延时消息的延时级别和消息在MessageQueue中的offset信息(每条消息的逻辑位置),然后从磁盘里面找到相应的消息进行定时处理

  接下来看下rocketMq对事务消息的控制:在rocketMq中对于事务消息的支持是采用两阶段提交的形式来支持的,先提交一个prepare消息到broker,然后在执行本地业务,然后再发送确认消息(commit或rollback)。

本地事务状态
public enum LocalTransactionState {
    COMMIT_MESSAGE,
    ROLLBACK_MESSAGE,
    UNKNOW,
}

本地业务在执行完成后再发送一个消息给broker,告诉他这条消息的处理策略,是commit还是rollback。

public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter tranExecuter, final Object arg)
        throws MQClientException {
        if (null == tranExecuter) {
            throw new MQClientException("tranExecutor is null", null);
        }
        Validators.checkMessage(msg, this.defaultMQProducer);

        SendResult sendResult = null;
        //设置消息属性,指定这条消息是一条prepare消息
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            //进行prepare消息发送
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        //根据prepare消息发送结果来进行决策本地业务是否执行
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    //在发送事务消息的时候,会回调实现了LocalTransactionExecuter这个类,作为参数传了进来
                    localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
            //进行事务的完结,发送一个确认消息给broker
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }
  1. 第一阶段先发送一个prepare消息给broker
  2. 根据prepare消息的发送结果来进一步处理,发送成功会进行回调实现了LocalTransactionExecuter类的方法,这是作为发送事务消息的参数传进来的
  3. 第二阶段发送一个确认消息给broker,commit或rollback。
1. Producer向Broker发送1条类型为TransactionPreparedType的消息,Broker接收消息保存在CommitLog中,然后返回消息的queueOffset和MessageId到Producer,MessageId包含有commitLogOffset(即消息在CommitLog中的偏移量,通过该变量可以直接定位到消息本身),由于该类型的消息在保存的时候,commitLogOffset没有被保存到consumerQueue中,此时客户端通过consumerQueue取不到commitLogOffset,所以该类型的消息无法被取到,导致不会被消费。
2. Producer端的TransactionExecuterImpl执行本地操作,返回本地事务的状态,然后发送一条类型为TransactionCommitType或者TransactionRollbackType的消息到Broker确认提交或者回滚,Broker通过Request中的commitLogOffset,获取到上面状态为TransactionPreparedType的消息(简称消息A),然后重新构造一条与消息A内容相同的消息B,设置状态为TransactionCommitType或者TransactionRollbackType,然后保存。其中TransactionCommitType类型的,会放commitLogOffset到consumerQueue中,TransactionRollbackType类型的,消息体设置为空,不会放commitLogOffset到consumerQueue中.

  在第一阶段发送prepare消息时,broker端还是用SendMessageProcessor来进行请求处理的,在处理的时候不会把prepare消息存到consumeQueue中去,只是会存到commitLog中

switch (tranType) {
                // Prepared and Rollback message is not consumed, will not enter the
                // consumer queuec
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    queueOffset = 0L;
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                default:
                    break;
            }

  在发送第二阶段的确认消息时,broker端采用的是EndTransactionProcessor进行处理的,根据消息在commitLog中的位置找到相应的消息进行处理,存到consumeQueue中或者从commitLog中移除:

msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
上一篇 下一篇

猜你喜欢

热点阅读