Rocketmq broker busy 根因问题分析

2024-03-09  本文已影响0人  绝尘驹

Broker busying

产线偶尔有业务发消息报broker busying 异常:

org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 501ms, size of queue: 78 BROKER

每个月不定期出现一次,就一下,不持续,业务每次都问,解决了不,有解决方案了不,之前忙nacos的事情,没有时间差,现在nacos 的问题平定了,可以腾出手来解决rocketmq的问题了,下面就时该问题的查找坎坷历程

什么情况报broker busy

rocketmq 有两种写入存储引擎,如果是开启了dledger模式,或者没有开启堆外内存模式,rocketmq都是直接写pagecache,只有开启了堆外内存模式,是先写堆外,再异步刷新到pagecache。

写pagecahce加锁代码如下:

        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        long elapsedTimeInLock;
        long queueOffset;
        try {
            beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now();
            if (isMultiDispatch) {
                boolean multiDispatchWrapResult = multiDispatch.wrapMultiDispatch(msg);
                if (!multiDispatchWrapResult) {
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
                } else {
                    encodeResult = this.messageSerializer.serialize(msg);
                    if (encodeResult.status != AppendMessageStatus.PUT_OK) {
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)));
                    }
                }
            }
            queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType);
            encodeResult.setQueueOffsetKey(queueOffset, false);
            AppendEntryRequest request = new AppendEntryRequest();
            request.setGroup(dLedgerConfig.getGroup());
            request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
            request.setBody(encodeResult.getData());
           // 开启dledger模式
            dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
            if (dledgerFuture.getPos() == -1) {
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
            }
            long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;

            int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
            ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);

            String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
            appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock);
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // The next update ConsumeQueue information
                    DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1);
                    multiDispatch.updateMultiQueueOffset(msg);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            log.error("Put message error", e);
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
        } finally {
            beginTimeInDledgerLock = 0;
            putMessageLock.unlock();
        }

        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult);
        }

上面是开启dledger模式写pagecache的一段代码,这个锁也可以是spin lock 或者是可重入锁,可以配置,rocketmq 计算一次写pagecache的时间,如果超过500ms,就打印一个notifyme的日志,记录下持有锁的时间。

正常情况下写pagecache 快,没有啥问题,毕竟是写内存,如果内存紧张或者其他原因导致写慢,这个锁持有的时间就长了,就阻塞了后面的消息的写入,如果频繁的慢,线程池队列有可能堆积满,触发rocketmq full gc,甚至不能用。

所以rocketmq 为了能及时发现这种问题,尽可能的快速失败,一个是不要让自己进入死循环,另外一个是也不要让客户端在那等,所以就需要检查,那从那里检查呢,有两个地方入手:

根据上面两种场景,客户端一共收到三种类型的broker busy,下面我们分别来看下对应的代码:

写入实时检查

PC_SYNCHRONIZED broker busy 是第一种主动检查的场景,是rocketmq 写pagecache时,如果持有锁没有被释放,而且时间超过了配置项:osPageCacheBusyTimeOutMills 配置的阀值,rocketmq 认为是os pagepage busy即写pagecache 太慢,则直接响应客户端写失败。

@Override
    public boolean isOSPageCacheBusy() {
        long begin = this.getCommitLog().getBeginTimeInLock();
        long diff = this.systemClock.now() - begin;

        return diff < 10000000
            && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
    }
        case OS_PAGECACHE_BUSY:
       response.setCode(ResponseCode.SYSTEM_ERROR);
       response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
           break;

定时任务检查

rocketmq 在broker 启动的时候会通过BrokerFastFailure 来启动这个定时任务,来对四种队列里的任务做检查:

代码如下:

private void cleanExpiredRequest() {
        //检查锁没有被释放,而且持有的时间超过了阀值的情况,说明当前环境已经很差了。
        while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
            try {
                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
                    if (null == runnable) {
                        break;
                    }

                    final RequestTask rt = castRunnable(runnable);
                    rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }

        //检查生产消息在队列的等待时间
        cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
        //检查消费消息在队列的等待时间
        cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
        //检查心调消息等待时间
        cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
        //检查事物消息等待时间
        cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
            .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
    }

根据代码可以看出,和我们本次分析的broker busy 问题相关的是头两个检查,这个两个检查也分别对应两种类型的异常:

[PCBUSY_CLEAN_QUEUE]broker busy

PCBUSY_CLEAN_QUEUE 就是任务检查到当前写锁持有时间超过了阀值,就从队列里取最早的一条消息任务,响应system_busy 异常,如此循环,直到锁被释放了为止,说白了就是当前已经扛不住了系统,就赶紧卸货。

TIMEOUT_CLEAN_QUEUE broker busy

如果当前pagecache 持锁的时间不长,或者定时任务来检查时,锁刚好释放了,第一种情况就检查不出来了,但不代表没有出现持有锁过长的时间的情况,所以也需要能发现,怎么发现呢,办法是通过检查队列里的消息任务等待的时间来判断,rocketmq 也提供了一个配置项maxWaitTimeMillsInQueue, 默认是200ms,如果超过了,则说明等待时间过长,直接不处理该消息了,我们先来看下检查的代码,实现如下:

void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
        while (true) {
            try {
                if (!blockingQueue.isEmpty()) {
                    //peek操作,相当于拿到消息任务的引用。
                    final Runnable runnable = blockingQueue.peek();
                    if (null == runnable) {
                        break;
                    }
                    final RequestTask rt = castRunnable(runnable);
                    if (rt == null || rt.isStopRun()) {
                        break;
                    }

                    //计算任务等待的耗时,如果小于则直接退出,因为后面的都是比该任务要新,所以时间肯定都没有达到。
                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                    if (behind >= maxWaitTimeMillsInQueue) {
                        if (blockingQueue.remove(runnable)) {
                            rt.setStopRun(true);
                            rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                        }
                    } else {
                        break;
                    }
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }
    }

实现也是一个循环,通过peek操作,获取到消息任务,计算在队列里的等待时间,如果大于maxWaitTimeMillsInQueue 则remove该任务,响应客户端[TIMEOUT_CLEAN_QUEUE]broker busy 异常,直到第一个耗时比maxWaitTimeMillsInQueue小的,就退出该循环。

其他的对消费队列,心跳队列,事物消息的清理也是同样的逻辑,如果时间超过了对应的阀值,就报broker busy,只是对应的客户端不同而已。

经过上面的分析,我们知道了rocketmq broker busy出现的场景,什么情况下会报该异常,但是为什么会出现这个异常,我们还没有分析,下面我们就进入根因分析的阶段。

为什么报Broker busy

通过前面的分析,我们知道了产生broker busy的问题,都是写pagecache 慢导致持有锁的时间太长,那为啥写pagecache这么慢呢,不是写内存吗,应该不回慢才对啊

写pagecache 是系统调用,写的os内存,所以我们得分析下什么情况下会导致写os pagecache慢,有下面几种情况:

第一种情况内存满了,我们就不分析了,监控报警都能看出来,一般这种情况的很少,因为产线的机器内存都配的很大,也会提前告警。

内核刷脏页

rocketmq 是io频繁的系统,系统脏页会很多,默认情况下os 会刷脏页,os刷脏页的时候,可能会阻塞写pagecache,比如在某个时候内存不足的情况,触发了脏页刷新,像这种内核级别的操作,也看不到啥证据,内核默认情况下,刷脏页的比例是毕竟底的,很容易触发,特别是rocketmq这种写频繁的应用,所以rocketmq 官方也提供了一个os.sh的脚本来调整内核刷pagecache的情况,跳高了内核触发刷脏页的比例到50%,rocketmq 自己会定时刷盘,完全不依赖内核的刷脏页来保证。

除了os.sh 脚本外,rocketmq还提供了setpage.sh脚本,来设置内存的低水位

这是理论分析,网上和社区也都有通过这个os.sh 这个脚本来优化的,github上也有类似的问题,但是我们在线下环境测试的时候,没有什么用,调整后测试还是报broker busy的问题,那只能说明我们这个不是内核刷脏页的问题。

IO压力大

这里肯定会有一个疑问,为啥io压力大,会影响pagecache的写操作,io应该只影响刷新数据,但是刷新数据是异步的,所以应该不影响pagecache写入才对,这个关键问题等我们拿到了证据再来回答,否则没有说服力。

我们最开始也是怀疑io压力的问题,但是每次报broker busy的时候,监控iops指标都很正常, 而且生产tps也不大,为了测试生产tps的达到多少,会受io的影响,我们在线下专门做了测试。

在磁盘空间正常的情况下,我们压测很正常,tps到7k左右没有啥问题,一直很稳定,随着压测的时间越久,磁盘使用空间越来大,到了rocketmq 清理的阀值75%,会触发清理任务,清理时会增加io的压力,如果到85%,就立即清理,客户端生产消息就不报broker busy异常。

rocketmq-delete.png

除了磁盘使用空间达到阀值触发删除文件增加io外,还有一个场景也会,就时rocketmq清理3天过期的文件,也会增加磁盘io,rocketmq 提供了一个配置参数:deleteWhen,比如我们配置deleteWhen=04,就时凌晨4点执行,这个时候生产消息基本上很低,就是防止在高峰期突然来一个清理过期的数据,导致io压力大影响刷盘,所以一般公司产线都会配置凌晨来执行。

所以这里的两个问题,我们产线的场景都不存在,一个磁盘空间很大,磁盘使用空间不到30%,过期数据清理也是凌晨,所以肯定不是这个两个原因导致broker busy问题。

既然不是这个原因,但是这个原因确实是影响写pagecache,只能继续找线索,我想能不能调整rocketmq 刷盘的时间间隔,看了下raft 框架dledge刷盘的代码,dledge 有一个线程专门负责mmap file的刷盘,代码如下:

public void doWork() {
            try {
                long start = System.currentTimeMillis();
                DLedgerMmapFileStore.this.dataFileList.flush(0);
                DLedgerMmapFileStore.this.indexFileList.flush(0);
                long elapsed;
                if ((elapsed = DLedgerUtils.elapsed(start)) > 500) {
                    logger.info("Flush data cost={} ms", elapsed);
                }

                if (DLedgerUtils.elapsed(lastCheckPointTimeMs) > dLedgerConfig.getCheckPointInterval()) {
                    persistCheckPoint();
                    lastCheckPointTimeMs = System.currentTimeMillis();
                }
                //刷新间隔,默认是10ms
                waitForRunning(dLedgerConfig.getFlushFileInterval());
            } catch (Throwable t) {
                logger.info("Error in {}", getName(), t);
                DLedgerUtils.sleep(200);
            }
        }

通过这个代码可以看到 rocketmq是每个隔10ms刷新一次mmapfile,可以调整,但是也看到,dledger 在刷盘的时候记录了刷盘的时间,如果大于500ms,就打印一个日志:Flush data cost,有这个日志,我们去查了下产线出现broker busy的时候,是否有这个日志,一查,还真有:

image.png

和客户端报broker busy的时间点一致,这下就问题就非常明确了,就是刷盘慢影响了pagecache 的写入,我们是云主机,我们就去确认了下主机磁盘的配置, 也确实是非常老的类型的磁盘,性能不好,和我们测试的磁盘不是一个类型,线下测试的高性能的磁盘,也证明了我们线下测试为啥能稳定在7k-8k都能稳定,现在我们来回答下前面的问题,为啥刷盘慢会影响写pagecache,

这个问题也和rocketmq 作者刘振东交流过,刷磁盘的时候是基于page 为单位的,刷的时候为了保证该脏页的数据一致性,会锁住该页,但是锁住前,该脏页有可能正在被rocketmq 写,因为只要没有被写满的page,还是能继续写的,所以一旦刷盘慢,被锁的时间长,就写page的时间长了,对应的就是写pagecache时持有锁的时间长,就触发broker busy。

总结

我们对rocketmq 产生broker busy的场景,以及触发这些场景的背后可能的原因,最后分析是rocketmq 这种写mmap file pagecache的机制,刷新pagecache 慢时,会影响写page cache 的时间,如果你的生产消息有broker busy问题,如果io 监控都正常,可以看看dledger 刷盘的时间的日志,像这种一瞬间的io抖动,iops监控指标是看不出来的。

上一篇 下一篇

猜你喜欢

热点阅读