技术原理一些收藏

Rocketmq搭建高可用集群

2022-08-26  本文已影响0人  运气爆棚lsw
Rocketmq搭建双主双从项目架构部署步骤如下:
  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者; 举例:收信者
  • Broker:暂存和传输消息;举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic
    一个消息的接收者可以订阅一个或者多个Topic消息
  • Message Queue:相当于是Topic的分区;用于并行发送和接收消息

集群特点

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

集群模式

1)单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

2)多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

3)多Master多Slave模式(异步)

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

4)多Master多Slave模式(同步)

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

3.3 双主双从集群搭建

1、下载RocketMq地址

wget https://archive.apache.org/dist/rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip

2、解压缩安装包

unzip rocketmq-all-4.9.3-bin-release.zip

3、更改namesrv的日志输出目录(两处)

vim /usr/local/rocketmq/rocketmq-4.9.3/conf/logback_namesrv.xml

将配置文件中所有的${user.home}/logs 更改为 /usr/local/rocketmq/logs(你自己的目录)

4、更改启动脚本修改Jvm内存大小,调整到合适你服务器的大小
vim /usr/local/rocketmq/rocketmq-4.9.3/bin/runserver.sh  

参数示例:

server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m

5、启动namesrv

nohup sh /myTest/rocketmq/rocketmq-4.9.3/bin/mqnamesrv 2>&1 &

6、安装broke(主机)#更改参数适合自己的服务器
vim  /usr/local/rocketmq/rocketmq-4.9.3-a-m/bin/tools.sh
vim  /usr/local/rocketmq/rocketmq-4.9.3-a-m/bin/runbroker.sh

tools:

server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m

runbroker:

server -Xms2g -Xmx2g

7、创建数据目录
主节点存储目录:

mkdir -p /usr/local/rocketmq/master/data/store
mkdir -p /usr/local/rocketmq/master/data/store/commitlog
mkdir -p /usr/local/rocketmq/master/data/store/consumequeue
mkdir -p /usr/local/rocketmq/master/data/store/index
mkdir -p /usr/local/rocketmq/master/data/store/checkpoint

从节点存储目录:

mkdir -p /usr/local/rocketmq/slaver/data/store
mkdir -p /usr/local/rocketmq/slaver/data/store/commitlog
mkdir -p /usr/local/rocketmq/slaver/data/store/consumequeue
mkdir -p /usr/local/rocketmq/slaver/data/store/index
mkdir -p /usr/local/rocketmq/slaver/data/store/checkpoint
8、接下来更改最重要的conf配置文件(主Master配置文件)
vim /usr/local/rocketmq/rocketmq-4.9.3/conf/2m-2s-sync/broker-a.properties
画外音:注释掉原有内容(之后提及的3个properties配置文件都得这样,下方不再提及)
# brokerClusterName=DefaultCluster
# brokerName=broker-b
# brokerId=1
# deleteWhen=04
# fileReservedTime=48
# brokerRole=SLAVE
# flushDiskType=ASYNC_FLUSH

1、Master1即broker-a.properties
#所属集群名字  
brokerClusterName=rocketmq-cluster  

#broker名字,注意此处不同的配置文件填写的不一样; 在broker-b.properties中此处需要修改为:brokerName=broker-b  
brokerName=broker-a  

#0: Master; >0: Slave  
brokerId=0  

#nameServer地址,分号分割  
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876  

#broker启动地址,rocketmq默认内网启动  
brokerIP1=192.168.1.1  

#broker的HAIP地址(供Slave同步消息的地址)  
brokerIP2=192.168.1.1  

#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数  
defaultTopicQueueNums=4  

#是否允许Broker自动创建Topic,建议线下开启,线上关闭  
autoCreateTopicEnable=**true**  

#是否允许Broker自动创建订阅组,建议线下开启,线上关闭  
autoCreateSubscriptionGroup=**true**  

#Broker对外服务的监听端口  
listenPort=10911  

#删除文件时间点,默认凌晨4点  
deleteWhen=04  

#文件保留时间,默认48h  
fileReservedTime=48  

#commitLog每个文件的大小默认1G  
mapedFileSizeCommitLog=1073741824  

#ConsumeQueue每个文件默认存30W条,根据业务情况调整  
mapedFileSizeConsumeQueue=300000  

#检测物理文件磁盘空间  
diskMaxUsedSpaceRatio=88  

#存储路径  
storePathRootDir=/usr/local/soft/rocketmq-4.9.0/store-a1  

#commitLog 存储路径  
storePathCommitLog=/usr/local/soft/rocketmq-4.9.0/store-a1/commitlog  

#消费队列存储路径存储路径  
storePathConsumeQueue=/usr/local/soft/rocketmq-4.9.0/store-a1/consumequeue  

#消息索引存储路径  
storePathIndex=/usr/local/soft/rocketmq-4.9.0/store-a1/index  

#checkpoint 文件存储路径  
storeCheckpoint=/usr/local/soft/rocketmq-4.9.0/store-a1/checkpoint  

#abort 文件存储路径  
abortFile=/usr/local/soft/rocketmq-4.9.0/store-a1/abort  

#限制的消息大小  
maxMessageSize=65536  

#Broker角色: ASYNC_MASTER(异步复制Master)、SYNC_MASTER(同步双写Master)、SLAVE(从节点)  
brokerRole=SYNC_MASTER  

#刷盘方式: ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘)  
flushDiskType=SYNC_FLUSH  
2、Slaver2即broker-b-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster

#broker名字,在broker-b-s.properties中需修改为:brokerName=broker-b
brokerName=broker-b

#0: Master; >0: Slave
brokerId=50

#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

#broker启动地址,rocketmq默认内网启动
brokerIP1=192.168.1.1

#broker的HAIP地址(供Slave同步消息的地址)
brokerIP2=192.168.1.1

#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=**true**

#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=**true**

#Broker对外服务的监听端口
listenPort=11011

#删除文件时间点,默认凌晨4点
deleteWhen=04

#文件保留时间,默认48h
fileReservedTime=48

#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88

#存储路径
storePathRootDir=/usr/local/soft/rocketmq-4.9.0/store-b1

#commitLog 存储路径
storePathCommitLog=/usr/local/soft/rocketmq-4.9.0/store-b1/commitlog

#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/soft/rocketmq-4.9.0/store-b1/consumequeue

#消息索引存储路径
storePathIndex=/usr/local/soft/rocketmq-4.9.0/store-b1/index

#checkpoint 文件存储路径
storeCheckpoint=/usr/local/soft/rocketmq-4.9.0/store-b1/checkpoint

#abort 文件存储路径
abortFile=/usr/local/soft/rocketmq-4.9.0/store-b1/abort

#限制的消息大小
maxMessageSize=65536

#Broker角色: ASYNC_MASTER(异步复制Master)、SYNC_MASTER(同步双写Master)、SLAVE(从节点)
brokerRole=SLAVE

#刷盘方式: ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘)
flushDiskType=ASYNC_FLUSH
3、Master2即broker-b.properties
#所属集群名字  
brokerClusterName=rocketmq-cluster  

#broker名字  
brokerName=broker-b  

#0: Master; >0: Slave  
brokerId=0  

#nameServer地址,分号分割  
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876  

#broker启动地址,rocketmq默认内网启动  
brokerIP1=192.168.1.2  

#BrokerHAIP地址,供slave同步消息的地址  
brokerIP2=192.168.1.2  

#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数  
defaultTopicQueueNums=4  

#是否允许Broker自动创建Topic,建议线下开启,线上关闭  
autoCreateTopicEnable=**true**  

#是否允许Broker自动创建订阅组,建议线下开启,线上关闭  
autoCreateSubscriptionGroup=**true**  

#Broker对外服务的监听端口  
listenPort=10911  

#删除文件时间点,默认凌晨4点  
deleteWhen=04  

#文件保留时间,默认48小时  
fileReservedTime=48  

#commitLog每个文件的大小默认1G  
mapedFileSizeCommitLog=1073741824  

#ConsumeQueue每个文件默认存30W条,根据业务情况调整  
mapedFileSizeConsumeQueue=300000  

#检测物理文件磁盘空间  
diskMaxUsedSpaceRatio=88  

#存储路径  
storePathRootDir=/usr/local/soft/rocketmq-4.9.0/store-b2  

#commitLog 存储路径  
storePathCommitLog=/usr/local/soft/rocketmq-4.9.0/store-b2/commitlog  

#消费队列存储路径存储路径  
storePathConsumeQueue=/usr/local/soft/rocketmq-4.9.0/store-b2/consumequeue  

#消息索引存储路径  
storePathIndex=/usr/local/soft/rocketmq-4.9.0/store-b2/index  

#checkpoint 文件存储路径  
storeCheckpoint=/usr/local/soft/rocketmq-4.9.0/store-b2/checkpoint  

#abort 文件存储路径  
abortFile=/usr/local/soft/rocketmq-4.9.0/store-b2/abort  

#限制的消息大小  
maxMessageSize=65536  

#Broker角色: ASYNC_MASTER(异步复制Master)、SYNC_MASTER(同步双写Master)、SLAVE(从节点)  
brokerRole=SYNC_MASTER  

#刷盘方式: ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘)  
flushDiskType=SYNC_FLUSH  
4、Slave1即broker-a-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster

#broker名字
brokerName=broker-a

#0: Master; >0: Slave
brokerId=70

#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

#broker启动地址,rocketmq默认内网启动
brokerIP1=192.168.75.131

#broker的HAIP地址(供Slave同步消息的地址)
brokerIP2=192.168.75.131

#在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4

#是否允许Broker自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=**true**

#是否允许Broker自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=**true**

#Broker对外服务的监听端口
listenPort=11011

#删除文件时间点,默认凌晨4点
deleteWhen=04

#文件保留时间,默认48小时
fileReservedTime=48

#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88

#存储路径
storePathRootDir=/usr/local/soft/rocketmq-4.9.0/store-a2

#commitLog 存储路径
storePathCommitLog=/usr/local/soft/rocketmq-4.9.0/store-a2/commitlog

#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/soft/rocketmq-4.9.0/store-a2/consumequeue

#消息索引存储路径
storePathIndex=/usr/local/soft/rocketmq-4.9.0/store-a2/index

#checkpoint 文件存储路径
storeCheckpoint=/usr/local/soft/rocketmq-4.9.0/store-a2/checkpoint

#abort 文件存储路径
abortFile=/usr/local/soft/rocketmq-4.9.0/store-a2/abort

#限制的消息大小
maxMessageSize=65536

#Broker角色: ASYNC_MASTER(异步复制Master)、SYNC_MASTER(同步双写Master)、SLAVE(从节点)
brokerRole=SLAVE

#刷盘方式: ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘)
flushDiskType=ASYNC_FLUSH
9、启动broker(主)启动broker(从)
# shell脚本定义变量
PREFIX_PATH="/usr/local/rocketmq/rocketmq-4.9.3"
LOG_PATH="/usr/local/rocketmq/logs"
cd /usr/local/rocketmq/rocketmq-4.9.3/bin

# 主节点

nohup sh $PREFIX_PATH/bin/mqbroker -c 
$PREFIX_PATH/conf/2m-2s-sync/broker-a.properties > $LOG_PATH/master/master-broker1.log 2>&1 &

# 从节点

nohup sh $PREFIX_PATH/bin/mqbroker -c 
$PREFIX_PATH/conf/2m-2s-sync/broker-b-s.properties > $LOG_PATH/slaver/broker-b-s.log 2>&1 &
个人编写部署启动脚本
启动namesrv
#!/bin/bash

ids=`ps -ef | grep "NamesrvStartup" | grep -v "grep" | awk '{print $2}'`
echo "当前服务id:" $ids

for id in $ids
do
    kill -9 $id
    echo "killed $id"  
done

sleep 3s
echo "Loading……"
#填写自己mq的路径  
cd /usr/local/rocketmq/rocketmq-4.9.3/bin

nohup sh ./mqnamesrv > /usr/local/rocketmq/logs/namesrv.log  2>&1 &
echo "--- 已完成启动请查看日志输出 ---"

启动Broker(start-broker.sh)
#!/bin/bash

ids=`ps -ef | grep "org.apache.rocketmq.broker.BrokerStartup" | grep -v "grep" | awk '{print $2}'`
echo "--- RocketMQ主节点当前服务id:" $ids

for id in $ids
do
    kill -9 $id
    echo "killed $id"  
done

sleep 3s

## mq存储路径
PREFIX_PATH="/usr/local/rocketmq/rocketmq-4.9.3"
# 日志存放路径
LOG_PATH="/usr/local/rocketmq/logs"

RUNNING_USER=root

echo "Loading……"
    
cd /usr/local/rocketmq/rocketmq-4.9.3/bin

# 主节点
echo "--- 开始启动RocketMQ主节点---"
nohup sh $PREFIX_PATH/bin/mqbroker -c $PREFIX_PATH/conf/2m-2s-sync/broker-a.properties > $LOG_PATH/master/master-broker1.log 2>&1 &

# 从节点
echo "--- 开始启动RocketMQ从节点---"
nohup sh $PREFIX_PATH/bin/mqbroker -c $PREFIX_PATH/conf/2m-2s-sync/broker-b-s.properties > $LOG_PATH/slaver/broker-b-s.log 2>&1 &


echo "--- 执行完毕,请查看 ---"
12、查看mq运行情况
./mqadmin clusterList -n  192.168.9.44:9876
13、在Linux部署RocketMQ可视控制台Dashboard
wget -P /usr/local/temp

https://github.com/apache/rocketmq-dashboard/archive/refs/tags/rocketmq-dashboard-1.0.0.tar.gz
RocketMq安装过程中遇到问题总结
Broker启动闪退问题1

如果是同一台机器,配置broker的存储路径一定要不一样,主要参数如下:

#存储路径  
storePathRootDir=/usr/local/soft/rocketmq-4.9.0/store-a1  
#commitLog 存储路径  
storePathCommitLog=/usr/local/soft/rocketmq-4.9.0/store-a1/commitlog  
#消费队列存储路径存储路径  
storePathConsumeQueue=/usr/local/soft/rocketmq-4.9.0/store-a1/consumequeue  
#消息索引存储路径  
storePathIndex=/usr/local/soft/rocketmq-4.9.0/store-a1/index  
#checkpoint 文件存储路径  
storeCheckpoint=/usr/local/soft/rocketmq-4.9.0/store-a1/checkpoint  
#abort 文件存储路径  
abortFile=/usr/local/soft/rocketmq-4.9.0/store-a1/abort 
Broker启动闪退问题2:

Master的监听端口都为10911,Slave的监听端口需与Master不同
否则会因端口占用导致Slave启动不了;Slave的端口都为:11011 (listenPort=11011)

Broker启动闪退问题所在3:

Master与Slave的文件存储路径不能相同,若相同,则主从文件存储冲突
导致1个服务器中某个Broker启动失败
我设置的:
Master的文件存储路径都为:/master/store-a1
Slave的文件存储路径都为:/slaver/stroe-b1

Broker启动闪退问题所在4:

Slave节点的ID问题,闪退会提示Exit 253;
一般大多人都设置为:brokerId=1,大于0就可以了,但是这个RocketMQ不能版本可能会有问题,
认为这个Id小于等于0,因此建议直接给Id设置的大一点,比如id=50等
还有什么其他问题,欢迎评论区留言,大家一起避免踩坑

14、安装Dashboard教程
1. git clone https://github.com/apache/rocketmq-dashboard.git
2. mvn clean package -Dmaven.test.skip=true
3. java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
15、Linux安装Java8
查看已安装的jdk
[root@tech2 ~]# rpm -qa|grep jdk
java-1.8.0-openjdk-devel-1.8.0.292.b10-1.el7_9.x86_64
copy-jdk-configs-3.3-10.el7_5.noarch
java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64
java-1.8.0-openjdk-headless-1.8.0.292.b10-1.el7_9.x86_64

卸载命令
yum -y remove java-1.8.0-openjdk-devel-1.8.0.292.b10-1.el7_9.x86_64
yum -y remove java-1.8.0-openjdk-1.8.0.292.b10-1.el7_9.x86_64
yum -y remove copy-jdk-configs-3.3-10.el7_5.noarch
可以多执行上述命令几次,确保所有已安装jdk被卸载
卸载完成之后Java命令不被识别 示例:java -version

第一步:解压到安装目录并修改名称
[root@bogon software]# tar -zxvf jdk-8u311-linux-x64.tar.gz -C /usr/local/java/
[root@tech1 software]# mv jdk1.8.0_33 jdk1.8.0

第二步:安装完毕之后在/etc/profile文件末尾添加(export必须顶格写)
vim /etc/profile
export JAVA_HOME=/usr/local/java/jdk1.8
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

第三步:使/etc/profile生效
[root@tech1 java]# source /etc/profile
16、linux 启动jar包 指定yml配置文件和输入日志文件命令为:
nohup java -jar project.jar  --spring.config.location=/home/project-conf/application.yml
>  /home/project-conf/nohup.out 2>&1 &

命令解读

nohup : 后台启动,窗口关闭继续执行。
java -jar :启动jar包命令。
project.jar:所启动的项目jar包。
--spring.config.location=:spring项目,指定所使用的的yml文件。
/home/project-conf/application.yml:yml路径
> /home/project-conf/nohup.out 2>&1 &:指定运行输出日志文件。
注:有时候启动会报错误  

No active profile set, falling back to default profiles: default
yml加载顺序 ,从高到低:
1、/config/application.yml
2、/application.yml
3、classpath:/config/application.yml
4、classpath:/application.yml
18、解决Linux shell脚本格式不对
vim hk.sh开文件
执行 : set ff=unix 设置文件为unix,然后执行:wq,保存成unix格式。
19、启动服务执行命令:
nohup sh /usr/local/rocketmq/rocketmq-4.9.3/bin/mqbroker
-c /usr/local/rocketmq/rocketmq-4.9.3/conf/2m-2s-sync/broker-a.properties > /usr/local/rocketmq/master-broker1.log 2>&1 &
20、Rocketmq官方文档地址

GitHub地址:https://github.com/apache/rocketmq/tree/master/docs/cnc
Gitee(速度更快):https://gitee.com/apache/rocketmq/tree/master/docs/cn#apache-rocketmq

消息的存储和发送

1)消息存储

磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。

2)消息发送

Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。
一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。

这两个看似简单的操作,实际进行了4 次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复 制到用户态内存;
  3. 然后从用户态 内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输。

通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的
RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。

这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了

1.1.4 消息存储结构

RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

1.1.5 刷盘机制

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。

1)同步刷盘

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。

2)异步刷盘

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

3)配置

同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。

1.2 高可用性机制

RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。

Master和Slave的区别:在Broker的配置文件中,参数 brokerId的值为0表明这个Broker是Master,大于0表明这个Broker是 Slave,同时brokerRole参数也会说明这个Broker是Master还是Slave。

Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。

1.2.1 消息消费高可用

在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。

1.2.2 消息发送高可用

在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可 用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件启动Broker。

1.2.3 消息主从复制

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。

1)同步复制

同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;

在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,
但是同步复制会增大数据写入 延迟,降低系统吞吐量。

2)异步复制

异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。

在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,
有些数据因为没有被写 入Slave,有可能会丢失;

3)配置

同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。

4)总结

实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低 性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。

1.3 负载均衡

1.3.1 Producer负载均衡

Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:

图中箭头线条上的标号代表顺序,发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至 Queue 1,以此类推。

1.3.2 Consumer负载均衡

1)集群模式

在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。

而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。

默认的分配算法是AllocateMessageQueueAveragely,如下图:

还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:


需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。

通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。

但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。

2)广播模式

由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。

在实现上,其中一个不同就是在consumer分配queue的时候,所有consumer都分到所有的queue。

1.4 消息重试

1.4.1 顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

1.4.2 无序消息的重试

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。

无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

1)重试次数

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。

注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。

2)配置方式

消费失败后,重试配置方式

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //处理消息
        doConsumeMessage(message);
        //方式1:返回 Action.ReconsumeLater,消息将重试
        return Action.ReconsumeLater;
        //方式2:返回 null,消息将重试
        return null;
        //方式3:直接抛出异常, 消息将重试
        throw new RuntimeException("Consumer Message exceotion");
    }
}

消费失败后,不重试配置方式

集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            doConsumeMessage(message);
        } catch (Throwable e) {
            //捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
            return Action.CommitMessage;
        }
        //消息处理正常,直接返回 Action.CommitMessage;
        return Action.CommitMessage;
    }
}

自定义消息最大重试次数

消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:

Properties properties = new Properties();
//配置对应 Group ID 的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);

注意:

获取消息重试次数

消费者收到消息后,可按照如下方式获取消息的重试次数:

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        //获取消息的重试次数
        System.out.println(message.getReconsumeTimes());
        return Action.CommitMessage;
    }
}
上一篇下一篇

猜你喜欢

热点阅读