javarocketmq

RocketMQ集群以及SpringBoot集成

2020-08-05  本文已影响0人  孤山之王

1. 中间件下载

官网上直接下载,比较简单就不赘述!

2. 简介

2.1. RocketMQ是什么

RocketMQ

RocketMQ 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点,具体特性如下:

2.2. 概念

2.2.1. 消息模型(Message Model)

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

2.2.2. 消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

2.2.3. 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。

2.2.4. 主题(Topic)

表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。

2.2.5. 名字服务(Name Server)

名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

2.2.6. 拉取式消费(Pull Consumer)

Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。

2.2.7. 推动式消费(Push Consumer)

Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。

2.2.8. 生产者组(Producer Group)

同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

2.2.9. 消费者组(Consumer Group)

同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

2.2.10. 普通顺序消息(Normal Ordered Message)

普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。

2.2.11. 严格顺序消息(Strictly Ordered Message)

严格顺序消息模式下,消费者收到的所有消息均是有顺序的。

2.3. 架构设计

2.3.1. 技术架构

技术架构图

2.3.2. 部署架构

部署架构图

3. 安装

由于RocketMQ安装比较简单,在单机这块就省略。

3.1. 单机

略!!!!

3.2. 集群

3.2.1. 集群的方式

后两种的方式比较复杂,涉及主从同步的问题,非必要的场景,建议采用多Master 这种方式。而我下面的例子也是以 多Master来部署的。

3.2.2. 部署

序号 IP 角色 模式
A 192.168.244.128 nameServer1,brokerServer1 Master1
B 192.168.244.129 nameServer2,brokerServer2 Master2

在 A、B 两服务器中将HOST文件修改,vi /etc/hosts

修改内容如下:

192.168.244.128 rocketmq-nameserver1
192.168.244.128 rocketmq-master1

192.168.244.129 rocketmq-nameserver2
192.168.244.129 rocketmq-master2

在A、B两台机器上传RockerMQ文件,并解压。我这里上传和解压路径在/usr/local

[root@centos8 rocketmq]$ pwd
/usr/local/rocketmq
[root@centos8 rocketmq]$ ll
total 40
drwxr-xr-x. 2 root root    83 Jun 24 02:49 benchmark
drwxr-xr-x. 3 root root  4096 Jun 24 02:02 bin
drwxr-xr-x. 6 root root   211 Jun  2 02:09 conf
drwxr-xr-x. 2 root root  4096 Jun 24 02:49 lib
-rw-r--r--. 1 root root 17336 Jun  2 02:09 LICENSE
-rw-r--r--. 1 root root  1338 Jun  2 02:09 NOTICE
-rw-r--r--. 1 root root  5069 Jun 24 02:02 README.md
[root@centos8 rocketmq]$ 

在A、B两台机器执行创建路径。

[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/commitlog
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/consumequeue
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/index

这里贴一个标准配置文件,具体如下:


# 4. 所属集群名字
brokerClusterName=rocketmq-cluster
# 5. broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a/broker-b ## 需要按机器A/B 区分
# 6. 0 表示 Master,>0 表示 Slave
brokerId=0
# 7. nameServer地址,分号分割

namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 8. 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 9. 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
# 10. 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# 11. Broker 对外服务的监听端口
listenPort=10911
# 12. 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 13. 文件保留时间,默认 48 小时
fileReservedTime=120
# 14. commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# 15. ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# 16. destroyMapedFileIntervalForcibly=120000
# 17. redeleteHangedFileInterval=120000
# 18. 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 19. 存储路径
storePathRootDir=/usr/local/rocketmq/store
# 20. commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
# 21. 消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# 22. 消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
# 23. checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# 24. abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
# 25. 限制的消息大小
maxMessageSize=65536
# 26. flushCommitLogLeastPages=4
# 27. flushConsumeQueueLeastPages=2
# 28. flushCommitLogThoroughInterval=10000
# 29. flushConsumeQueueThoroughInterval=60000
# 30. Broker 的角色
# 31. - ASYNC_MASTER 异步复制Master
# 32. - SYNC_MASTER 同步双写Master
# 33. - SLAVE
brokerRole=ASYNC_MASTER
# 34. 刷盘方式
# 35. - ASYNC_FLUSH 异步刷盘
# 36. - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# 37. checkTransactionMessageEnable=false

# 38. 发消息线程池数量
# 39. sendMessageThreadPoolNums=128
# 40. 拉消息线程池数量
# 41. pullMessageThreadPoolNums=128

机器A

[root@centos8 rocketmq]$ vi/usr/local/rocketmq/conf/2m-noslave/broker-a.properties

其中brokerName=broker-a

在机器B

[root@centos8 rocketmq]$ vi/usr/local/rocketmq/conf/2m-noslave/broker-a.properties

其中brokerName=broker-b

不用说肯定也是A、B两台都要改

[root@centos8 rocketmq]$ mkdir -p /usr/local/rocketmq/logs
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

/usr/local/rocketmq/bin 路径下,找到runbroker.shrunserver.sh

我们将这两个文件的JAVA_OPT 参数修改下,不然默认情况下,JVM配置是 8G。如 JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

修改后:

...
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
....
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ nohup sh mqnamesrv &
机器A 机器B
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties &
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-b.properties &

rocketmq-console下载

这个包需要利用maven编译打包。我这里打包一个,放百度云盘上,供下载!

链接: https://pan.baidu.com/s/1hfvzJeyBG7TXnvPHtn3C5Q

提取码: atkq

最后执行jar文件

[root@centos8 rocketmq]$ java -jar rocketmq-console-ng-1.0.0.jar

页面的端口是 8082,刚开始启动有点慢,稍微等会!!

rocketmq-console界面
[root@centos8 rocketmq]$ cd /usr/local/rocketmq/bin
[root@centos8 rocketmq]$ sh mqshutdown broker
[root@centos8 rocketmq]$ sh mqshutdown namesrv

这里需要等待完全停止!

[root@centos8 rocketmq]$ rm -rf /usr/local/rocketmq/store
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/commitlog
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/consumequeue
[root@centos8 rocketmq]$ mkdir /usr/local/rocketmq/store/index

最终按照以上步骤重启NameServer和BrokerServer即可!

42. SpringBoot集成

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>{换成相应版本}</version>
</dependency>
######### 42.1. RocketMQ ##########
rocketmq:
  name-server: 192.168.244.128:9876;192.168.244.129:9876;
  producer:
    group: drunkard
    send-message-timeout: 30000
@Slf4j
@RestController
public class RocketMqDemo {

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @GetMapping("send/{id}")
    public String send(@PathVariable("id") String id){
        UserVo userVo  = new UserVo(id,"侯征");
        log.warn(JSON.toJSONString(userVo));
        rocketMQTemplate.send("rocket-topic-01", MessageBuilder.withPayload(userVo).build());
        return "SUCESS";
    }
}
@Slf4j
@Component
@RocketMQMessageListener(topic = "rocket-topic-01", consumerGroup = "my-rocket-topic-01")
public class UserConsumer implements RocketMQListener<UserVo> {

    @Override
    public void onMessage(UserVo message) {
        log.warn("接受到消息: {}",message.toString());
    }
}

43. 案例下载

案例下载,希望多多给Star。

上一篇 下一篇

猜你喜欢

热点阅读