框架源码研究

RocketMQ消息的主从复制机制

2019-11-06  本文已影响0人  无醉_1866

基本原理

RocketMQ的broker会被划分成master和slave两种角色,只有master能接受producer所发送的消息,消息的replication示意图如下:

image

消息复制的基本框架

前面介绍了RocketMQ的消息复制的基本原理,我们现在看看RocketMQ是如何做的消息复制,在RocketMQ中,有ha相关的包:

image

在store子工程中,虽然只有三个类,但代码结构确不简单,WaitNotifyObject是用于做等待唤醒的(生产者消费者模式),HA相关的主体代码在HAService和HAConnection及它们的内部类中,总体结构如下:

image

源码阅读

image.png

可以看到,WriteSocketService线程每次从nextTransferFromWhere开始读取一定的消息数据,并通过transferData方法发送给slave,此外,nextTransferFromWhere会增加,增加的数量是读取的消息数量。将消息写到socketchannel时,除了消息本身,还会写入一定的头信息,putLong和putInt写入了12字节,分别表示offset和size

image

byteBufferRead是一个大小为1MB的ByteBuffer,因为offset是一个long类型的值,而long类型的值占8个字节,因此rocketmq在处理byteBufferRead中的数据时,需要以8字节为一个单位估数据的读取。

master数据收到的数据可能并不是一个完整的long,有可能会出现半包或者粘包的情况,因此会有一个int pos = this.byteBufferRead.position() – (this.byteBufferRead.position % 8)的方式计算出当前完整包的位置

当下一次读取数据时,会先判断byteBufferRead是不是满了,满了则清理一次,所以如果读到了半包,则下一次读取数据后,半包会变成一个完整的包:

image

rocketmq的master每次在读取到8字节及8字节以上的数据后,会取最后的完整的8字节作为offset。

先看看HAClient的run方法,其主体逻辑是读取数据,然后上报offset:

image image image

生产者发送消息的结果

我们回到CommitLog的putMessage方法,其中有handleDiskFlush的调用:

image

当刷盘超时,会返回FLUSH_DISK_TIMEOUT,我们再看看handleHA方法:

image

handleHA方法中可能会抛出FLUSH_SLAVE_TIMEOUT和SLAVE_NOT_AVAILABLE。

我们注意到,发消息的顺序:写byteBuffer -> flushDisk -> flushSlave,这三者之间没有事务保证,可能会出现以下情况:

image

刷盘使用的是GroupCommitService,它包含一个队列和一个线程,刷盘实际上也是异步的,只不过SYNC_FLUSH模式下,会同步等待刷盘的结果,对于返回了FLUSH_DISK_TIMEOUT的消息,后面也会被刷到磁盘上,但是如果当master宕机,而消息还来不及刷盘。那就会有消息丢失了

image

实际上也是由HAService中的线程完成的同步slave,而SYNC_MASTER模式只是同步等待结果:

对于收到了FLUSH_DISK_TIMEOUT的结果时,生产者需要评估少量消息丢失是否会有影响,如果有,则需要生产者做重试,消费者支持幂等,如果没影响,则忽略即可

上一篇下一篇

猜你喜欢

热点阅读