RocketMQ主从集群模式搭建

2020-11-30  本文已影响0人  端碗吹水

准备两台机器,一主一从:

机器IP hostname 角色
192.168.243.169 rocketmq01 master
192.168.243.170 rocketmq02 slave

我这里事先在两台机器上安装好了RocketMQ,关于RocketMQ的安装可以参考如下文章:

接下来,我们开始搭建RocketMQ主从集群。首先,配置两台机器的hosts

$ vim /etc/hosts
192.168.243.169 rocketmq-nameserver1 rocketmq-master1
192.168.243.170 rocketmq-nameserver2 rocketmq-slave1

修改master节点的配置文件:

[root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a.properties
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a.properties
#节点所属的集群名称
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq-4.7.1/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq-4.7.1/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq-4.7.1/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

修改slave节点的配置文件:

[root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a-s.properties
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a-s.properties
#节点所属的集群名称
brokerClusterName=rocketmq-cluster
#broker 名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer 地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4 点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog 每个文件的大小默认 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq-4.7.1/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq-4.7.1/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq-4.7.1/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制 Master
#- SYNC_MASTER 同步双写 Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

然后将这两个配置文件拷贝到Slave节点上:

[root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties
[root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a-s.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties

完成配置后,就可以启动RocketMQ了,在master节点上执行如下命令:

[root@rocketmq01 ~]# nohup sh mqnamesrv &
[root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

在slave节点上执行如下命令:

[root@rocketmq02 ~]# nohup sh mqnamesrv &
[root@rocketmq02 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &

启动完成后,分别在两个节点上检查下服务的进程和端口是否正常:

[root@rocketmq01 ~]# jps
1942 Jps
1739 NamesrvStartup
1775 BrokerStartup
[root@rocketmq01 ~]# netstat -lntp |grep java
tcp6       0      0 :::10909                :::*              LISTEN      1775/java           
tcp6       0      0 :::10911                :::*              LISTEN      1775/java           
tcp6       0      0 :::10912                :::*              LISTEN      1775/java           
tcp6       0      0 :::9876                 :::*              LISTEN      1739/java           
[root@rocketmq01 ~]# 

修改RocketMQ的管控台配置,并启动:

[root@rocketmq01 ~]# cd /usr/local/src/rocketmq-externals/rocketmq-console/
[root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# vim src/main/resources/application.properties 
# 增加nameserver的地址
rocketmq.config.namesrvAddr=192.168.243.169:9876;192.168.243.170:9876
[root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# java -jar target/rocketmq-console-ng-2.0.0.jar

此时在管控台中可以看到有两个节点了:


image.png

在Dashboard中也可以看到有两个Broker:


image.png

主从集群模式下的高可用机制故障演练

创建一个普通的Maven项目,pom文件添加rocketmq-client依赖如下:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>

生产者代码示例:

package com.zj.rocketmq.learn.quickstart;

import com.zj.rocketmq.learn.constant.Constants;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.UUID;

/**
 * rocketmq - 生产者
 *
 * @author 01
 * @date 2020-11-30
 **/
public class Producer {

    public static void main(String[] args) throws Exception {
        // 在rocketmq中生产者必须在一个生产者组内
        String producerGroup = "quickstart_producer_group";
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        // 设置nameserver的地址
        producer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESSES);
        // 启动生产者
        producer.start();

        // 消息投递的目标主题
        String topic = "quickstart_topic";
        // 给消息打一个标签,标签的主要作用是用来过滤的
        String tag = "quickstart_tag";
        // 给消息设置一个key,是消息的唯一标识
        String key = UUID.randomUUID().toString();
        // 消息体,即具体的消息内容
        String body = "this is quickstart message!";
        Message message = new Message(topic, tag, key, body.getBytes());
        // 发送消息
        SendResult sendResult = producer.send(message);
        System.out.println("消息发送结果:" + sendResult);
        producer.shutdown();
    }
}

常量类代码:

public class Constants {

    public static final String NAME_SERVER_ADDRESSES = "192.168.243.169:9876;192.168.243.170:9876";
}

运行生产者代码发送一条消息,控制台输出如下:

消息发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A8010B36502437C6DC998FAEE00000, offsetMsgId=C0A8F3A900002A9F0000000000033234, messageQueue=MessageQueue [topic=quickstart_topic, brokerName=broker-a, queueId=1], queueOffset=0]

此时将主节点给停掉,模拟宕机:

[root@rocketmq01 ~]# mqshutdown broker

然后编写消费者端,代码如下:

package com.zj.rocketmq.learn.quickstart;

import com.zj.rocketmq.learn.constant.Constants;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

/**
 * rocketmq - 消费者
 *
 * @author 01
 * @date 2020-11-30
 **/
public class Consumer {

    public static void main(String[] args) throws Exception {
        // 定义消费者组
        String consumerGroup = "quickstart_consumer_group";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        // 设置nameserver的地址
        consumer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESS);
        // 设置从哪个位置开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        // 从哪个主题消费数据
        String topic = "quickstart_topic";
        // 用于匹配消息标签的表达式
        String subExpression = "*";
        // 订阅主题
        consumer.subscribe(topic, subExpression);

        // 注册消息监听器,在监听器中实现消息的处理逻辑
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            System.out.println("------------- 接收到消息,开始进行业务处理 -------------");
            for (MessageExt msg : msgs) {
                try {
                    System.out.printf("topic: %s, tags: %s, keys: %s, body: %s%n",
                            msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody()));

                    if ("0".equals(msg.getKeys())) {
                        throw new RuntimeException("模拟业务处理发生异常");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    int reconsumeTimes = msg.getReconsumeTimes();
                    System.err.println("reconsumeTimes: " + reconsumeTimes);
                    if (reconsumeTimes == 3) {
                        // TODO 重试次数达到阈值,放弃重试,记录日志后续做补偿...
                        System.out.println("重试次数达到阈值,放弃重试!");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }

                    // 消息处理失败时返回,由于Broker的重试机制,会重新消费该消息
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            // 消息处理成功时返回
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // 启动消费者
        consumer.start();
        System.out.println("consumer started...");
    }
}

运行消费者,正常情况下,该消费者依旧能够消费到数据:


image.png

重新启动master节点,让其重新加入集群:

[root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &

在此过程注意查看消费者的控制台,正常情况下,master重新加入集群,消费者也不会重复消费,因为master会和slave同步offset进度。

上一篇下一篇

猜你喜欢

热点阅读