RocketMQ数据同步源码分析

2019-10-02  本文已影响0人  __TiAmo

数据同步包括两部分数据

元数据同步

可以猜想一下,数据同步是发生在broker主从之间,元数据信息应该在slave broker启动时同步master broker的数据。在BrokerController的initialize()方法中找到了同步的代码。

// 如果角色是slave
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                // 定时任务进行同步元数据信息
                BrokerController.this.slaveSynchronize.syncAll();
            } catch (final Throwable e) {
                log.error("ScheduledTask syncAll slave exception", e);
            }
        }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}

如果是slave broker 会每隔一分钟发起一次同步请求,然后进入slaveSynchronize.syncAll()方法。

public void syncAll() {
    // 同步 topic 配置信息
    this.syncTopicConfig();
    // 同步消费者偏移量
    this.syncConsumerOffset();
    // 同步延迟偏移量
    this.syncDelayOffset();
    // 同步订阅组配置信息
    this.syncSubscriptionGroupConfig();
}

private void syncTopicConfig() {
    final String masterAddrBak = this.masterAddr;
    if (masterAddrBak != null) {
        try {
            // 获取topic 信息
            final TopicConfigSerializeWrapper topicWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
            // 如果数据的版本号不匹配,更新版本号和数据
            if (!this.brokerController.getTopicConfigManager().getDataVersion()
                    .equals(topicWrapper.getDataVersion())) {

                this.brokerController.getTopicConfigManager().getDataVersion()
                        .assignNewOne(topicWrapper.getDataVersion());
                this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
                this.brokerController.getTopicConfigManager().getTopicConfigTable()
                        .putAll(topicWrapper.getTopicConfigTable());
                this.brokerController.getTopicConfigManager().persist();

                log.info("Update slave topic config from master, {}", masterAddrBak);
            }
        } catch (final Exception e) {
            log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
        }
    }
}

private void syncConsumerOffset() {
    final String masterAddrBak = this.masterAddr;
    if (masterAddrBak != null) {
        try {
            final ConsumerOffsetSerializeWrapper offsetWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
            this.brokerController.getConsumerOffsetManager().getOffsetTable()
                    .putAll(offsetWrapper.getOffsetTable());
            this.brokerController.getConsumerOffsetManager().persist();
            log.info("Update slave consumer offset from master, {}", masterAddrBak);
        } catch (final Exception e) {
            log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
        }
    }
}

private void syncDelayOffset() {
    final String masterAddrBak = this.masterAddr;
    if (masterAddrBak != null) {
        try {
            final String delayOffset =
                    this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
            if (delayOffset != null) {

                final String fileName =
                        StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
                                .getMessageStoreConfig().getStorePathRootDir());
                try {
                    MixAll.string2File(delayOffset, fileName);
                } catch (final IOException e) {
                    log.error("Persist file Exception, {}", fileName, e);
                }
            }
            log.info("Update slave delay offset from master, {}", masterAddrBak);
        } catch (final Exception e) {
            log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
        }
    }
}

private void syncSubscriptionGroupConfig() {
    final String masterAddrBak = this.masterAddr;
    if (masterAddrBak != null) {
        try {
            final SubscriptionGroupWrapper subscriptionWrapper =
                    this.brokerController.getBrokerOuterAPI()
                            .getAllSubscriptionGroupConfig(masterAddrBak);

            // 如果数据的版本号不匹配,更新版本号和数据
            if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
                    .equals(subscriptionWrapper.getDataVersion())) {
                final SubscriptionGroupManager subscriptionGroupManager =
                        this.brokerController.getSubscriptionGroupManager();
                subscriptionGroupManager.getDataVersion().assignNewOne(
                        subscriptionWrapper.getDataVersion());
                subscriptionGroupManager.getSubscriptionGroupTable().clear();
                subscriptionGroupManager.getSubscriptionGroupTable().putAll(
                        subscriptionWrapper.getSubscriptionGroupTable());
                subscriptionGroupManager.persist();
                log.info("Update slave Subscription Group from master, {}", masterAddrBak);
            }
        } catch (final Exception e) {
            log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
        }
    }
}

通过代码,可以清晰的看到四种元数据的同步:

public TopicConfigSerializeWrapper getAllTopicConfig(
        final String addr) throws RemotingConnectException, RemotingSendRequestException,
        RemotingTimeoutException, InterruptedException, MQBrokerException {
    // 封装cmd request
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}

public ConsumerOffsetSerializeWrapper getAllConsumerOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBrokerException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}

public String getAllDelayOffset(
    final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
    RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return new String(response.getBody(), MixAll.DEFAULT_CHARSET);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}

public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(
    final String addr) throws InterruptedException, RemotingTimeoutException,
    RemotingSendRequestException, RemotingConnectException, MQBrokerException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}

四种元数据的请求是几乎一致,调用NettyRemotingClient的invokeSync()方法,然后再调用NettyRemotingClient的invokeSyncImpl()方法

public RemotingCommand invokeSync(final String addr, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
    final long beginStartTime = System.currentTimeMillis();
    // 获取channel
    final Channel channel = this.getAndCreateChannel(addr);
    if (channel != null && channel.isActive()) {
        try {
            if (this.rpcHook != null) {
                this.rpcHook.doBeforeRequest(addr, request);
            }

            // 计算耗时时间
            final long costTime = System.currentTimeMillis() - beginStartTime;
            // 如果超时时间 < 耗时时间 抛出异常
            if (timeoutMillis < costTime) {
                throw new RemotingTimeoutException("invokeSync call timeout");
            }
            // 调用invokeSyncImpl
            final RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
            if (this.rpcHook != null) {
                this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
            }
            return response;
        } catch (final RemotingSendRequestException e) {
            log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
            this.closeChannel(addr, channel);
            throw e;
        } catch (final RemotingTimeoutException e) {
            if (this.nettyClientConfig.isClientCloseSocketIfTimeout()) {
                this.closeChannel(addr, channel);
                log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
            }
            log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
            throw e;
        }
    } else {
        this.closeChannel(addr, channel);
        throw new RemotingConnectException(addr);
    }
}

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
                                          final long timeoutMillis)
            throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();

    try {
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        // netty channel send request
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(final ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }

                NettyRemotingAbstract.this.responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });

        final RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }

        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}

元数据同步分析完毕,然后分析一下消息的同步。

消息同步

核心类:HAService、HAConnection
Master:
HAService.AcceptSocketService:接收Slave节点的连接
HAConnection.ReadSocketService:读来自Slave节点的数据
HAConnection.WriteSocketService:往Slave节点写数据
Slave:
HAService.HAClient:对Master连接、读写数据

Slave => Master 上报CommitLog已经同步到的offset
Master => Slave 传输新的CommitLog数据

消息同步是基于NIO 完成的,下面简单分析一下源码

public void start() throws Exception {
    // 注册accept事件
    this.acceptSocketService.beginAccept();
    // 启动AcceptSocketService线程
    this.acceptSocketService.start();
    // 启动GroupTransferService线程
    this.groupTransferService.start();
    // 启动HAClient
    this.haClient.start();
}

首先分析一下acceptSocketService.beginAccept()

public void beginAccept() throws Exception {
    this.serverSocketChannel = ServerSocketChannel.open();
    this.selector = RemotingUtil.openSelector();
    this.serverSocketChannel.socket().setReuseAddress(true);
    this.serverSocketChannel.socket().bind(this.socketAddressListen);
    this.serverSocketChannel.configureBlocking(false);
    this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

可以看到这是一个标准NIO编程写法,注册accept事件。
接下来分析一下HAClient的run() 方法

public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 连接master
            if (this.connectMaster()) {

                if (this.isTimeToReportOffset()) {
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if (!result) {
                        this.closeMaster();
                    }
                }

                this.selector.select(1000);

                // 处理读时间
                boolean ok = this.processReadEvent();
                if (!ok) {
                    this.closeMaster();
                }

                if (!reportSlaveMaxOffsetPlus()) {
                    continue;
                }

                // 判断上次的时间间隔,如果大于配置值,关闭master
                long interval =
                    HAService.this.getDefaultMessageStore().getSystemClock().now()
                        - this.lastWriteTimestamp;
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                    .getHaHousekeepingInterval()) {
                    log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                        + "] expired, " + interval);
                    this.closeMaster();
                    log.warn("HAClient, master not response some time, so close connection");
                }
            } else {
                this.waitForRunning(1000 * 5);
            }
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.waitForRunning(1000 * 5);
        }
    }

    log.info(this.getServiceName() + " service end");
}

进入connectMaster()方法

private boolean connectMaster() throws ClosedChannelException {
    if (null == this.socketChannel) {
        final String addr = this.masterAddress.get();
        if (addr != null) {
            // 获取地址
            final SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                // 连接
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    //注册读事件
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }

        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        this.lastWriteTimestamp = System.currentTimeMillis();
    }

    return this.socketChannel != null;
}

RemotingUtil.connect()中会完成客户端socket 初始化,连接工作。

public static SocketChannel connect(SocketAddress remote){
    return connect(remote, 1000 * 5);
}

public static SocketChannel connect(SocketAddress remote, final int timeoutMillis) {
    SocketChannel sc = null;
    try {
        sc = SocketChannel.open();
        sc.configureBlocking(true);
        sc.socket().setSoLinger(false, -1);
        sc.socket().setTcpNoDelay(true);
        sc.socket().setReceiveBufferSize(1024 * 64);
        sc.socket().setSendBufferSize(1024 * 64);
        sc.socket().connect(remote, timeoutMillis);
        sc.configureBlocking(false);
        return sc;
    } catch (Exception e) {
        if (sc != null) {
            try {
                sc.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }

    return null;
}

我们继续分析run()里的processReadEvent()方法

private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
    while (this.byteBufferRead.hasRemaining()) {
        try {
            // readSize > 0 有数据读取,进行数据读取
            final int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                this.lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
                readSizeZeroTimes = 0;
                final boolean result = this.dispatchReadRequest();
                if (!result) {
                    log.error("HAClient, dispatchReadRequest error");
                    return false;
                }
            } else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            } else {
                log.info("HAClient, processReadEvent read socket < 0");
                return false;
            }
        } catch (final IOException e) {
            log.info("HAClient, processReadEvent read socket exception", e);
            return false;
        }
    }

    return true;
}

主要逻辑是通过dispatchReadRequest()方法完成

private boolean dispatchReadRequest() {
    final int msgHeaderSize = 8 + 4; // phyoffset + size
    final int readSocketPos = this.byteBufferRead.position();

    while (true) {
        // begin -> 读取到请求数据
        final int diff = this.byteBufferRead.position() - this.dispatchPostion;
        if (diff >= msgHeaderSize) {
            final long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion);
            final int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8);

            final long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

            if (slavePhyOffset != 0) {
                // 如果slave的同步offset != master的同步offset 返回false
                if (slavePhyOffset != masterPhyOffset) {
                    log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                            + slavePhyOffset + " MASTER: " + masterPhyOffset);
                    return false;
                }
            }

            if (diff >= (msgHeaderSize + bodySize)) {
                final byte[] bodyData = new byte[bodySize];
                this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize);
                this.byteBufferRead.get(bodyData);

                // 追加数据
                HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                this.byteBufferRead.position(readSocketPos);
                this.dispatchPostion += msgHeaderSize + bodySize;

                if (!reportSlaveMaxOffsetPlus()) {
                    return false;
                }

                continue;
            }
        }

        // 如果写满了 重新分配空间
        if (!this.byteBufferRead.hasRemaining()) {
            this.reallocateByteBuffer();
        }

        break;
    }

    return true;
}

在这关键点是比较 slave和master的位置,如果一致,同步消息数据,不一致直接返回false。

消息同步的简单分析就到此结束了。

结尾语

本次主要分析了RocketMQ的数据同步,包括元数据同步和消息同步,希望各位读者有所收获。

上一篇下一篇

猜你喜欢

热点阅读