程序员技术交流分布式&高可用

天池中间件大赛——单机百万消息队列存储分享

2018-08-03  本文已影响192人  王亚普

这次天池中间件性能大赛初赛和复赛的成绩都正好是第五名,本次整理了复赛《单机百万消息队列的存储设计》的思路方案分享给大家,实现方案上也是决赛队伍中相对比较特别的。

赛题回顾

评测逻辑

评测环境

赛题分析

对于单机几百的大队列来说业务已有成熟的方案,Kafka和RocketMQ。

方案 几百个大队列
Kafka 每个队列一个文件(独立存储)
RocketMQ 所有队列共用一个文件(混合存储)

若直接采用现有的方案,在百万量级的小队列场景都有极大的弊端。

方案 百万队列场景弊端
Kafka独立存储 单个小队列数据量少,批量化程度完全取决于内存大小,落盘时间长,写数据容易触发IOPS瓶颈
RocketMQ混合存储 随机读严重,一个块中连续数据很低,读速度很慢,消费速度完全受限于IOPS

为了兼顾读写速度,我们最终采用了折中的设计方案:多个队列merge,共享一个块存储。

image

设计核心思想

架构设计

image

架构图中Bucket Manager和Group Manager分别对百万队列进行分桶以及合并管理,然后左右两边是分别是写模块和读模块,数据写入包括队列merge处理,消息块落盘。读模块包括索引管理和读缓存。(见左图)

bucket、group、queue的关系:对消息队列进行bucket处理,每个bucket包含多个group,group是我们进行队列merge的最小单元,每个group管理固定数量的队列。(见右图)

存储设计

image

接下来对整个存储每个阶段的细节进行展开分析,包括队列合并、索引管理和数据落盘。

MQ Merge

1. 百万队列数据Bucket Hash分桶

image

2. Bucket视角

image

3. Group分配过程

image

4. Group视角的数据写入

image

索引管理

1. L2二级索引

L2二级索引与数据存储的位置息息相关,见下图。为每个排序后的Block块建立一个L2索引,L2索引的结构分为文件偏移(file offset),数据压缩大小(size),原始大小(raw size),因为我们是多个队列merge,然后接下来是每个队列相对于起始位置的delta offset以及消息数量。

image

2. L1一级索引

为了加快查询速度,在L2基础上建立L1一级索引,每16个L2建立一个L1,L1按照时间先后顺序存放。L1和L2的组织关系如下:

image

L1索引的结构非常简单,file id对应消息存储的文件id,以及16个Block块中每个队列消息的起始序列号seq num。例如MQ1从序列号1000开始,MQ2从序列号2000开始等等。

image

3. Index Query

如何根据索引定位需要查找的数据?

对L1先进行二分查找,定位到上下界范围,然后对范围内的所有L2进行顺序遍历。

image

Data Flush

1. 同步Flush

当blcok超过指定大小后,根据桶的hashcode再进行一次mask操作将group中的队列数据同步写入到m个文件中。

同步刷盘主要尝试了两种方案:Nio和Dio。Dio大约性能比Nio提升约5%。CPP使用DIO是非常方便的,然而作为Java Coder你也许是第一次听说DIO,在Java中并没有提供直接使用DIO的接口,可以通过JNA的方式调用。

DIO(DIRECT IO,直接IO),出于对系统cache和调度策略的不满,用户自己在应用层定制自己的文件读写。DIO最大的优点就是能够减少OS内核缓冲区和应用程序地址空间的数据拷贝次数,降低文件读写时的CPU开销以及内存的占用。然而DIO的缺陷也很明显,DIO在数据读取时会造成磁盘大量的IO,它并没有缓冲IO从PageCache获取数据的优势。

image

这里就遇到一个问题,同样配置的阿里云机器测试随机数据同步写入性能是非常高的,但是线上的评测数据都是58字节,数据过于规整导致同一时间落盘的概率很大,出现了大量的锁竞争。所以这里做了一个小的改进:按概率随机4K、8K、16K进行落盘,写性能虽有一定提升,但是效果也是不太理想,于是采用了第二种思路异步刷盘。

2. 异步Flush

采用RingBuffer接收block块,使用AIO对多个block块进行Batch刷盘,减少IO Copy的次数。异步刷盘写性能有了显著的提升。

image

以下是异步Flush的核心代码:

while (gWriterThread) {
    if (taskQueue->pop(task)) {
        writer->mWriting.store(true);
        do {
            // 使用异步IO
            aiocb *pAiocb = aiocb_list[aio_size++];
            memset(pAiocb, 0, sizeof(aiocb));
            pAiocb->aio_lio_opcode = LIO_WRITE;
            pAiocb->aio_buf = task.mWriteCache.mCache;
            pAiocb->aio_nbytes = task.mWriteCache.mSize;
            pAiocb->aio_offset = task.mWriteCache.mStartOffset;
            pAiocb->aio_fildes = task.mBlockFile->mFd;
            pAiocb->aio_sigevent.sigev_value.sival_ptr = task.mBlockFile;
            task.mBlockFile->mWriting = true;
            
            if (aio_size >= MAX_AIO_TASK_COUNT) {
                break;
            }
        } while (taskQueue->pop(task));
        
        if (aio_size > 0) {
            if (0 != lio_listio(LIO_WAIT, aiocb_list, aio_size, NULL)) {
                aos_fatal_log("aio error %d %s.", errno, strerror(errno));
            }
            
            for (int i = 0; i < aio_size; ++i) {
                ((BlockFile *) aiocb_list[i]->aio_sigevent.sigev_value.sival_ptr)->mWriting = false;
                free((void *) aiocb_list[i]->aio_buf);
            }
            aio_size = 0;
        }
    } else {
        ++waitCount;
        sched_yield();
        if (waitCount > 100000) {
            usleep(10000);
        }
    }
}

读缓存设计

数据读取流程

整个流程主要有两个优化点:预读取和读缓存。

image

预读取优化

1. 记录上一次读取(消费)的offset

主要有两个作用:

2. 预读取时机

顺序消费且已经消费到当前block尾,则进行预读取操作。如何判断顺序消费?判断上次消费的结束位置是否与这次消费的起始位置相等。

if (msgCount >= destCount) {
    if (mLastGetSequeneNum == offsetCount &&
        beginIndex + 1 < mL2IndexCount &&
        beginOffsetCount + blockIndex.mMsgDeltaIndexCount <= offsetCount + msgCount + msgCount) {
        MessageBlockIndex &nextIndex = mL2IndexArray[beginIndex + 1];
        // 预读取
#ifdef __linux__
        readahead(pManager->GetFd(hash), nextIndex.mFileOffset, PER_BLOCK_SIZE);
#endif
    }
    mLastGetSequeneNum = offsetCount + msgCount;
    return msgCount;
}

Read Cache

关于read cache做了一些精巧的小设计,保证足够简单高效。

1. Read Cache全貌

Read Cache一共分为N=64(可配)个Bucket,每个Bucket中包含M=3200(可配)个缓存块,大概总计20w左右的缓存块,每个是4k,大约占用800M的内存空间。

image

2. 核心数据结构

image

关于缓存的核心数据结构,我们并没有从队列的角度出发,而是针对L2索引和缓存块进行了绑定,这里设计了一个双向指针。判断缓存是否有效的核心思路:check双向指针是否相等。

CacheItem cachedItem = (CacheItem *) index->mCache;
cachedItem->mIndexPtr == (void *) index;

3. 算法实现

3.1 Bucket分桶

image

3.2 Alloc Cache Block

image

3.3 Cache Hit

3.4 Cache Page Replace

image image

4. Read Cache & LRU & PageCache 对比

开始我们尝试了两种读缓存方案:最简单的LRU缓存和直接使用PageCache读取。PageCache所实现的其实是高级版的LRU缓存。在顺序读的场景下,我们自己实现的读缓存(Cycle Cache Allocate,暂简称为CCA)与LRU、PageCache的优劣分析对比如下:

总结

创新点

工程价值:

思考

转载请注明出处,欢迎关注我的公众号:亚普的技术轮子

亚普的技术轮子
上一篇下一篇

猜你喜欢

热点阅读