RocketMQ系列之快速入门
简介
Apache RocketMQ 是阿里开源的一款高性能、高吞吐量的分布式消息中间件,在由阿里捐赠给Apache软件基金会之后孵化成了Apache的一个顶级项目(Top-Level Project,TLP)。RocketMQ 是基于阿里闭源的 MetaQ 内核实现的,它服务于阿里的各个体系,尤其是在电商领域提供了极高的并发支撑。RocketMQ 是使用 Java 语言开发的,因此相较于业界流行的其它消息中间件比如 RabbitMQ(使用 Erlang 开发)、Kafka(使用 Scala 开发),它对于 Java 程序员来说更加友好,更加利于开发者在其底层的基础上进行封装和扩展。
RocketMQ 的官网是http://rocketmq.apache.org/,根据官网的介绍,Apache RocketMQ™ 是一个标准化的消息引擎,轻量级的数据处理平台。它具备以下优势:
- 低延迟:在高并发压力下,超过99.6%的响应延迟在1毫秒之内
- 面向金融的:系统支撑的追踪和审计功能具备高可用性
- 行业可发展性:保证了万亿级别的消息容量
- 厂商中立的:在最新的4.1版本开放了一个新的分布式消息和流标准
- 对大数据友好的:具备通用集成功能的批量传输支撑了海量吞吐
- 大量的积累:只要给与足够的磁盘空间,就可以累积消息而不会造成性能损失
RocketMQ 诞生背景
在早期,阿里曾基于 ActiveMQ 5.x(低于5.3)构建了分布式消息中间件。 并将其用于他们的跨国公司的异步通信,搜索,社交网络活动流,数据管道,甚至交易过程中。 随着贸易业务吞吐量的增长,来自消息集群的压力也就变成了亟待解决的问题。
为什么设计 RocketMQ?
随着队列的增长和虚拟主题的使用,ActiveMQ IO 模块遇到了瓶颈。 尽管阿里尽力通过限流,熔断或降级来解决此问题,但效果不佳。 因此,那时他们开始关注流行的消息传递解决方案Kafka。 不幸的是,Kafka不能满足阿里的要求,特别是在低延迟和高可靠性方面,请参阅此处以了解详细信息。
在这种情况下,他们决定发明一个新的消息传递引擎来处理更广泛的用例集——从传统的发布/订阅方案到大批量实时零损失容忍的交易系统。
RocketMQ vs. ActiveMQ vs. Kafka
下表展示了RocketMQ,ActiveMQ和Kafka(依照awesome-java 来说,Apache RocketMQ是最流行的消息传递解决方案)之间的比较:
消息中间件 | ActiveMQ | Kafka | RocketMQ |
---|---|---|---|
客户端SDK | Java,.NET,C++等 | Java, Scala等 | Java, C++, Go |
协议和规范 | 推模型,支持OpenWire, STOMP, AMQP, MQTT, JMS | 拉模型,支持TCP | 拉模型,支持TCP,JMS,OpenMessaging |
顺序消息 | 独立的消费者或者队列可以确保顺序 | 确保分区内消息的顺序 | 能确保对消息进行严格排序,并可以很好地扩展 |
定时消息 | 支持 | 不支持 | 支持 |
批量消息 | 不支持 | 支持,带有异步生产者 | 支持,使用异步模式来避免消息丢失 |
广播消息 | 支持 | 不支持 | 支持 |
消息过滤 | 支持 | 支持,可以使用Kafka流来过滤消息 | 支持,基于SQL92的属性过滤表达式 |
服务器触发的消息重新投递 | 不支持 | 不支持 | 支持 |
消息存储 | 使用JDBC以及高性能日志(例如levelDB,kahaDB)来支持非常快速的持久化操作 | 高性能文件存储 | 高性能且低延迟的文件存储 |
消息追溯性 | 支持 | 通过偏移量指示器来支持这一特性 | 支持时间戳和偏移量两种指示器 |
消息优先级 | 支持 | 不支持 | 不支持 |
高可用和故障转移 | 支持,取决于存储,如果使用kahadb,则需要一个ZooKeeper服务器 | 支持,需要一个ZooKeeper服务器 | 支持,主从模式,不需要其他工具 |
消息追踪 | 不支持 | 不支持 | 支持 |
配置 | 默认配置为低级别,用户需要优化配置参数 | Kafka使用键值对格式进行配置。 这些值可以从文件或以编程方式提供 | 开箱即用,用户只需要关注少量的配置 |
管理和操作工具 | 支持 | 支持,使用终端命令展示核心指标 | 支持,丰富的Web和终端命令可显示核心指标 |
Quick Start
本部分内容将会介绍如何在本地的机器上快速安装一个 RocketMQ 用于收发消息,更多的细节可以访问 https://github.com/apache/rocketmq/tree/master/docs/cn进行查看。
前置要求
在安装 RokcetMQ 之前需要先安装如下软件:
- 64位操作系统,推荐使用 Linux/Unix/Mac 操作系统
- 64位 JDK 1.8 及以上的版本
- Maven 3.2.x
- Git
- 为 Broker server 至少预留 4G 的磁盘空间
下载、安装及使用
Linux
1. 安装jdk
- 首先在 oracle 官网下载 Linux jdk 8 的压缩包
- 然后使用 ftp 工具将下载好的压缩包上传到 Linux 服务器上
- 使用 tar -zxvf jdk-8u261-linux-x64.tar.gz 命令解压文件
- 配置系统环境变量
[root@node01 opt]# vi /etc/profile
在文件末尾添加以下内容:
# jdk放置目录
JAVA_HOME=/opt/jdk1.8.0_226
# jre放置目录
JRE_HOME=/opt/jdk1.8.0_226/jre
# 配置 path 环境变量,以 : 分隔
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
# 配置 classpath 环境变量,以 : 分隔
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
# 设置环境变量
export JAVA_HOME JRE_HOME PATH CLASSPATH
- 使配置文件的变更生效
[root@node01 opt]# source /etc/profile
- 查看 jdk 版本,至此,jdk 就已经安装完毕
[root@node01 opt]# java -version
java version "1.8.0_226"
Java(TM) SE Runtime Environment (build 1.8.0_226-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.226-b11, mixed mode)
2. 安装maven
- 使用 wget 下载 maven
[root@node01 opt]# wget https://mirrors.bfsu.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
- 解压
[root@node01 opt]# tar -zxvf apache-maven-3.6.3-bin.tar.gz
- 添加阿里云镜像
[root@node01 opt]# cd apache-maven-3.6.3/conf/
[root@node01 conf]# ll
总用量 20
drwxr-xr-x. 2 root root 4096 11月 7 2019 logging
-rw-r--r--. 1 root root 10468 11月 7 2019 settings.xml
-rw-r--r--. 1 root root 3747 11月 7 2019 toolchains.xml
[root@node01 conf]# vi settings.xml
在配置文件 settings.xml 中加入如下代码:
<mirror>
<id>aliyun-maven</id>
<mirrorOf>*</mirrorOf>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
- 配置环境变量
[root@node01 conf]# vi /etc/profile
在文件中添加如下配置:
# maven 安装目录
M2_HOME=/opt/apache-maven-3.6.3
PATH=$PATH:$M2_HOME/bin
export M2_HOME
[root@node01 opt]# source /etc/profile
- 查看 maven 版本,至此,maven 就安装完毕了
[root@node01 conf]# mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /opt/apache-maven-3.6.3
Java version: 1.8.0_226, vendor: Oracle Corporation, runtime: /opt/jdk1.8.0_226/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1127.el7.x86_64", arch: "amd64", family: "unix"
3. 安装rocketmq
- 下载 rocketmq 源码包
[root@node01 opt]# wget https://mirror.bit.edu.cn/apache/rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip
- 解压
[root@node01 opt]# yum install -y unzip
[root@node01 opt]# unzip rocketmq-all-4.7.1-source-release.zip
- 去到解压后的文件目录中进行编译
[root@node01 rocketmq-all-4.7.1-source-release]# mvn -Prelease-all -DskipTests clean install -U
- 将编译好的文件移个位置
[root@node01 rocketmq-all-4.7.1-source-release]# cd distribution/target/rocketmq-4.7.1
[root@node01 rocketmq-4.7.1]# ll
总用量 4
drwxr-xr-x. 6 root root 4096 8月 29 15:42 rocketmq-4.7.1
[root@node01 rocketmq-4.7.1]# mv rocketmq-4.7.1/ /opt
- 启动 Name Server,如果在终端显示如下信息,则说明启动成功了:
[root@node01 opt]# cd rocketmq-4.7.1/bin/
[root@node01 bin]# ls
cachedog.sh dledger mqbroker mqbroker.numanode1 mqnamesrv mqshutdown.cmd play.sh runbroker.sh setcache.sh tools.sh
cleancache.sh mqadmin mqbroker.cmd mqbroker.numanode2 mqnamesrv.cmd os.sh README.md runserver.cmd startfsrv.sh
cleancache.v1.sh mqadmin.cmd mqbroker.numanode0 mqbroker.numanode3 mqshutdown play.cmd runbroker.cmd runserver.sh tools.cmd
[root@node01 bin]# ./mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
- 启动 broker
使用命令启动 broker,会发现报如下的错误:
[root@node01 bin]# ./mqbroker -n localhost:9876
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /opt/rocketmq-4.7.1/bin/hs_err_pid9514.log
从错误信息我们可以得知原因是 jvm 启动初始化内存分配大于物理内存。
因此,我们可以修改启动脚本中的 jvm 参数来解决这个问题,首先,让我们来修改nameserver的启动脚本:
[root@node01 bin]# vi runserver.sh
找到分配jvm内存的配置行:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
将它修改为如下配置(具体配置数值根据实际生产场景进行调整):
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
然后修改broker的启动脚本:
[root@node01 bin]# vi runbroker.sh
同样是找到分配jvm内存的配置行:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
将它修改为如下配置(具体配置数值根据实际生产场景进行调整):
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
修改完之后重启 nameserver 和 broker,当出现如下信息,说明 broker 已经启动成功:
[root@node01 bin]# ./mqbroker -n localhost:9876
The broker[node01, 192.168.114.60:10911] boot success. serializeType=JSON and name server is localhost:9876
- 执行测试程序测试消息发送和接收
在发送/接收消息之前,我们需要告诉客户端 Name Server 的位置。RocketMQ提供了多种方法来实现这一目标。为了简单起见,我们通过设置环境变量 NAMESRV_ADDR 来实现,在 /etc/profile 中加入如下配置:
export NAMESRV_ADDR=localhost:9876
使用测试程序发送消息:
./tools.sh org.apache.rocketmq.example.quickstart.Producer
显示如下信息则表示消息发送成功:
另起一个会话,然后用测试程序来接收消息:
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
显示如下信息则表示消息接收成功:
- 关闭
关闭 broker:
[root@node01 bin]# ./mqshutdown broker
The mqbroker(1502) is running...
Send shutdown request to mqbroker(1502) OK
关闭 name server:
[root@node01 bin]# ./mqshutdown namesrv
The mqnamesrv(1444) is running...
Send shutdown request to mqnamesrv(1444) OK
Windows
本文主要讲述的是 windows 10 操作系统下 RocketMQ 的安装,请确保你的操作系统中已经安装了 PowerShell。和 Linux 一样,在 Windows 安装 RocketMQ 需要先安装 jdk 和 maven ,本文就不再细述如何安装 jdk 和 maven 了,请自行查阅资料安装。
1. 安装 RocketMQ
- 首先,在官网下载 RocketMQ 的二进制压缩包,然后选择一个本地目录进行解压缩
-
配置环境变量,添加如下两个环境变量:
打开 runbroker.cmd,修改如下配置行:
set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"
修改后的配置行如下:
rem set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%"
- 启动 name server:
- 启动 broker: 当控制台显示如下信息时,则说明启动成功了:
-
使用测试程序发送/接收消息
发送消息: 接收消息:
- 关闭
直接关闭 cmd/powershell 即可(不要在生产环境这样做)
安装启动时可能会遇到的问题
1. 编译时包无法在mirror上找到 提示502错误
原因:网络不好或maven仓库服务器出错
重试即可,或者换一个镜像仓库
2. 启动broker失败,报 Cannot allocate memory 错误
此问题在上文中已经叙述过了,在此就不再赘述了
3. 启动broker成功但提示:Failed to obtain the host name
原因:无法解析当前的主机名
在/etc/hosts里添加映射即可
# 配置 ip 地址到主机名的映射
192.168.114.60 node-01
4. linux日期校准
安装ntpdate
yum install ntpdate
ntpdate ntp1.aliyun.com
控制台rocketmq-console
编译安装
1. 下载
编译源码包下载地址:https://github.com/apache/rocketmq-externals
中文指南
https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md
2. 将源码包上传到服务器并解压缩
[root@node01 opt]# unzip rocketmq-externals.zip
3. 编译
[root@node01 opt]# cd rocketmq-externals/rocketmq-console/
[root@node01 rocketmq-console]# mvn clean package -Dmaven.test.skip=true
4. 启动
编译成功后在rocketmq-console/target
目录下执行rocketmq-console-ng-2.0.0.jar
启动(需要确保你的 name server 已经启动了),直接动态添加nameserver
地址即可,或者你也可以通过编辑rocketmq-console/src/main/resources
目录下的application.properties
配置文件添加rocketmq.config.namesrvAddr
属性来配置nameserver
地址。
[root@node01 rocketmq-console]# ll
总用量 68
drwxr-xr-x. 3 root root 4096 8月 30 10:50 doc
-rw-r--r--. 1 root root 30422 8月 30 10:50 LICENSE
-rw-r--r--. 1 root root 180 8月 30 10:50 NOTICE
-rw-r--r--. 1 root root 10593 8月 30 10:50 pom.xml
-rw-r--r--. 1 root root 2390 8月 30 10:50 README.md
drwxr-xr-x. 4 root root 4096 8月 30 10:50 src
drwxr-xr-x. 3 root root 4096 8月 30 10:50 style
drwxr-xr-x. 7 root root 4096 8月 30 10:59 target
[root@node01 rocketmq-console]# cd target/
[root@node01 target]# java -jar rocketmq-console-ng-2.0.0.jar --rocketmq.config.namesrvAddr=localhost:9876
当出现如下界面时就说明启动成功了:
启动成功后访问服务器 8080 端口即可。
界面组成
RocketMQ 是面向集群而生的,这一点从 rocketmq-console 的界面上就能体现出来。
运维
运维- NameSvrAddrList:配置 name server 的地址,它是个列表,因此可以配置多个 name server 的地址,这说明 name server 是可以集群部署的
- IsUseVIPChannel:是否使用VIPChannel
驾驶舱
驾驶舱- Broker TOP 10:查看消息量最多的10个broker的消息量(总量)
- Broker 5min trend:查看broker消息量5分钟的趋势
- 主题 TOP 10:查看消息量最多的10个单一主题的消息量(总量)
- 主题 5min trend:查看主题消息量5分钟的趋势
集群
集群- 查看集群的分布情况
- cluster与broker关系
- cluster中包含的broker
- 查看broker具体信息/运行信息/状态信息
- 查看broker配置信息
主题
主题- 展示所有的主题,可以通过主题名称进行过滤
- 筛选 普通/重试/死信/系统 主题
- 新增/更新主题
- clusterName:集群名
- brokerName:主机名
- topicName:主题名
- writeQueueNums:写队列数量
- readQueueNums:读队列数量
- perm:2是写 4是读 6是读写
- 状态 查询消息投递状态(投递到哪些broker/哪些queue/多少量等)
- 路由 查看消息的路由(现在你发这个主题的消息会发往哪些broker,对应broker的queue信息)
- CONSUMER管理(这个topic都被哪些group订阅了、消费了,消费情况何如)
- topic配置(查看/变更当前的topic的配置)
- 发送消息(向这个主题发送一个测试消息)
- 重置消费位点(分为在线和离线两种情况,不过都需要检查重置是否成功)
- 删除主题 (会删除掉所有broker以及namesrv上的主题配置和路由信息,在生产环境上请慎重进行这项操作)
消费者
消费者- 展示所有的消费组,可以通过组名进行过滤
- 刷新页面/每隔五秒定时刷新页面
- 按照订阅组/数量/TPS/延迟 进行排序
- 新增/更新消费组
- clusterName :集群名
- brokerName:主机名
- groupName:消费组名字
- consumeEnable:是否可以消费,设置为FALSE的话将无法进行消费
- consumeBroadcastEnable:是否可以广播消费
- retryQueueNums:重试队列的大小
- brokerId:正常情况从哪消费
- whichBrokerWhenConsumeSlowly:出问题了从哪消费
- 终端:在线的消费客户端查看,包括版本订阅信息和消费模式
- 消费详情:对应消费组的消费明细查看,这个消费组订阅的所有Topic的消费情况,每个queue对应的消费client查看(包括Retry消息)
- 配置:查看/变更消费组的配置
- 删除:在指定的broker上删除消费组(谨慎操作)
生产者
生产者- 展示在线的消息生产者客户端(主机、版本、地址等信息),可以通过组名进行过滤
- 通过主题进行筛选
消息
消息- 根据Topic和时间区间查询(由于数据量大 最多只会展示2000条,多的会被忽略)
- 根据Message Key和Topic进行查询
- 最多只会展示64条
- 根据Message Id和Topic进行消息的查询
- 消息详情:展示这条消息的详细信息,查看消息对应到具体消费组的消费情况(如果异常,可以查看具体的异常信息)。可以向指定的消费组重发消息。
消息轨迹
消息轨迹- 根据Message Key和Topic进行查询
- 最多只会展示64条
- 根据Message Id和Topic进行消息的查询
- 消息轨迹详情:展示展示这条消息的消息轨迹的详细信息