RocketMQ数据同步源码分析
数据同步包括两部分数据
- 元数据信息
- 真实的消息数据
元数据信息同步是通过netty实现,消息数据是通过socket实现的,下面通过源码分析一下这两个过程。
元数据同步
可以猜想一下,数据同步是发生在broker主从之间,元数据信息应该在slave broker启动时同步master broker的数据。在BrokerController的initialize()方法中找到了同步的代码。
// 如果角色是slave
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 定时任务进行同步元数据信息
BrokerController.this.slaveSynchronize.syncAll();
} catch (final Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
如果是slave broker 会每隔一分钟发起一次同步请求,然后进入slaveSynchronize.syncAll()方法。
public void syncAll() {
// 同步 topic 配置信息
this.syncTopicConfig();
// 同步消费者偏移量
this.syncConsumerOffset();
// 同步延迟偏移量
this.syncDelayOffset();
// 同步订阅组配置信息
this.syncSubscriptionGroupConfig();
}
private void syncTopicConfig() {
final String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
// 获取topic 信息
final TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
// 如果数据的版本号不匹配,更新版本号和数据
if (!this.brokerController.getTopicConfigManager().getDataVersion()
.equals(topicWrapper.getDataVersion())) {
this.brokerController.getTopicConfigManager().getDataVersion()
.assignNewOne(topicWrapper.getDataVersion());
this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
this.brokerController.getTopicConfigManager().getTopicConfigTable()
.putAll(topicWrapper.getTopicConfigTable());
this.brokerController.getTopicConfigManager().persist();
log.info("Update slave topic config from master, {}", masterAddrBak);
}
} catch (final Exception e) {
log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
}
}
}
private void syncConsumerOffset() {
final String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
final ConsumerOffsetSerializeWrapper offsetWrapper =
this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
this.brokerController.getConsumerOffsetManager().getOffsetTable()
.putAll(offsetWrapper.getOffsetTable());
this.brokerController.getConsumerOffsetManager().persist();
log.info("Update slave consumer offset from master, {}", masterAddrBak);
} catch (final Exception e) {
log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
}
}
}
private void syncDelayOffset() {
final String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
final String delayOffset =
this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
if (delayOffset != null) {
final String fileName =
StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
.getMessageStoreConfig().getStorePathRootDir());
try {
MixAll.string2File(delayOffset, fileName);
} catch (final IOException e) {
log.error("Persist file Exception, {}", fileName, e);
}
}
log.info("Update slave delay offset from master, {}", masterAddrBak);
} catch (final Exception e) {
log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
}
}
}
private void syncSubscriptionGroupConfig() {
final String masterAddrBak = this.masterAddr;
if (masterAddrBak != null) {
try {
final SubscriptionGroupWrapper subscriptionWrapper =
this.brokerController.getBrokerOuterAPI()
.getAllSubscriptionGroupConfig(masterAddrBak);
// 如果数据的版本号不匹配,更新版本号和数据
if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
.equals(subscriptionWrapper.getDataVersion())) {
final SubscriptionGroupManager subscriptionGroupManager =
this.brokerController.getSubscriptionGroupManager();
subscriptionGroupManager.getDataVersion().assignNewOne(
subscriptionWrapper.getDataVersion());
subscriptionGroupManager.getSubscriptionGroupTable().clear();
subscriptionGroupManager.getSubscriptionGroupTable().putAll(
subscriptionWrapper.getSubscriptionGroupTable());
subscriptionGroupManager.persist();
log.info("Update slave Subscription Group from master, {}", masterAddrBak);
}
} catch (final Exception e) {
log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
}
}
}
通过代码,可以清晰的看到四种元数据的同步:
- 同步 topic 配置信息
- 同步消费者偏移量
- 同步延迟偏移量
- 同步订阅组配置信息
继续跟进代码,进入BrokerOuterAPI类
public TopicConfigSerializeWrapper getAllTopicConfig(
final String addr) throws RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException, InterruptedException, MQBrokerException {
// 封装cmd request
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
final String addr) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public String getAllDelayOffset(
final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return new String(response.getBody(), MixAll.DEFAULT_CHARSET);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
final String addr) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
四种元数据的请求是几乎一致,调用NettyRemotingClient的invokeSync()方法,然后再调用NettyRemotingClient的invokeSyncImpl()方法
public RemotingCommand invokeSync(final String addr, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
final long beginStartTime = System.currentTimeMillis();
// 获取channel
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
// 计算耗时时间
final long costTime = System.currentTimeMillis() - beginStartTime;
// 如果超时时间 < 耗时时间 抛出异常
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
// 调用invokeSyncImpl
final RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response;
} catch (final RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (final RemotingTimeoutException e) {
if (this.nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();
try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// netty channel send request
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
NettyRemotingAbstract.this.responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
final RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}
元数据同步分析完毕,然后分析一下消息的同步。
消息同步
核心类:HAService、HAConnection
Master:
HAService.AcceptSocketService:接收Slave节点的连接
HAConnection.ReadSocketService:读来自Slave节点的数据
HAConnection.WriteSocketService:往Slave节点写数据
Slave:
HAService.HAClient:对Master连接、读写数据
Slave => Master 上报CommitLog已经同步到的offset
Master => Slave 传输新的CommitLog数据
消息同步是基于NIO 完成的,下面简单分析一下源码
public void start() throws Exception {
// 注册accept事件
this.acceptSocketService.beginAccept();
// 启动AcceptSocketService线程
this.acceptSocketService.start();
// 启动GroupTransferService线程
this.groupTransferService.start();
// 启动HAClient
this.haClient.start();
}
首先分析一下acceptSocketService.beginAccept()
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
可以看到这是一个标准NIO编程写法,注册accept事件。
接下来分析一下HAClient的run() 方法
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 连接master
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
// 处理读时间
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
// 判断上次的时间间隔,如果大于配置值,关闭master
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
进入connectMaster()方法
private boolean connectMaster() throws ClosedChannelException {
if (null == this.socketChannel) {
final String addr = this.masterAddress.get();
if (addr != null) {
// 获取地址
final SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
// 连接
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
//注册读事件
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
RemotingUtil.connect()中会完成客户端socket 初始化,连接工作。
public static SocketChannel connect(SocketAddress remote){
return connect(remote, 1000 * 5);
}
public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) {
SocketChannel sc = null;
try {
sc = SocketChannel.open();
sc.configureBlocking(true);
sc.socket().setSoLinger(false, -1);
sc.socket().setTcpNoDelay(true);
sc.socket().setReceiveBufferSize(1024 * 64);
sc.socket().setSendBufferSize(1024 * 64);
sc.socket().connect(remote, timeoutMillis);
sc.configureBlocking(false);
return sc;
} catch (Exception e) {
if (sc != null) {
try {
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
return null;
}
我们继续分析run()里的processReadEvent()方法
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
// readSize > 0 有数据读取,进行数据读取
final int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
this.lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
readSizeZeroTimes = 0;
final boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (final IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
主要逻辑是通过dispatchReadRequest()方法完成
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
final int readSocketPos = this.byteBufferRead.position();
while (true) {
// begin -> 读取到请求数据
final int diff = this.byteBufferRead.position() - this.dispatchPostion;
if (diff >= msgHeaderSize) {
final long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
final int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);
final long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
// 如果slave的同步offset != master的同步offset 返回false
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
if (diff >= (msgHeaderSize + bodySize)) {
final byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
this.byteBufferRead.get(bodyData);
// 追加数据
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
this.dispatchPostion += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
// 如果写满了 重新分配空间
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
在这关键点是比较 slave和master的位置,如果一致,同步消息数据,不一致直接返回false。
消息同步的简单分析就到此结束了。
结尾语
本次主要分析了RocketMQ的数据同步,包括元数据同步和消息同步,希望各位读者有所收获。