RocketMq总结

2019-01-22  本文已影响2人  知止9528

架构图

架构图.png

基本概念

Producer

消息生产者,负责产生消息,一般由业务系统负责产生消息

Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费

Broker

消息中转角色,负责存储消息,转収消息,一般也称为 Server。

NameServer

类似于zookeeper


概念模型

image.png

即消息是根据主题(即我们图里面的Topic)进行订阅,而每个Topic下面又可以有多个队列,只是这里的队列并不真正存储消息,而是起到类似索引的作用,消息真正存储在CommitLog里面,如下图


RocketMq消息实际存储结构.png

所有数据单独存储到一个 Commit Log,完全顺序写,随机读。对最终用户展现的队列实际只存储消息在 Commit Log 的位置信息

Message Queue

在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。也可以认为Message Queue是一个长度无限的数组,offset就是下标。

这样做的好处
(1)队列轻量化,单个队列数据量非常少
(2)对磁盘的访问串行化,避免磁盘竟争,丌会因为队列增加导致 IOWAIT 增高

这样做的缺点
(1)写虽然完全是顺序写,但是读却发成了完全的随机读。
(2)读一条消息,会先读 Consume Queue,再读 Commit Log,增加了开销。
(3)要保证 Commit Log 不 Consume Queue 完全的一致,增加了编程的复杂度

RocketMq的解决方案

随机读(主要是指磁盘随机读),尽可能让读命中 PAGECACHE,减少 IO 读操作,所以内存越大越好。同时由于缓存的局部性原理,可以很快的在内存上读取到消息


RocketMq里面的消息类型

顺序消息

消费消息的顺序要同収送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送到同一个队列,返样 Consumer 就可以按照 Producer 发送的顺序去消费消息。

普通顺序消息

顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker重启,由于队列总数发生发化,哈希取模后定位的队列会发化,产生短暂的消息顺序不一致。如果业务能容忍在集群异常情况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。

严格顺序消息

严格顺序消息顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前还未实现)目前已知的应用只有数据库binlog同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。


获取消息的方式


RocketMq里面消息的几种消费方式


涉及到磁盘,就会有零拷贝,RocketMq也不例外,常用的零拷贝有如下两种方式

内存映射

image.png

对应的java代码

File file = new File("data.zip");
        RandomAccessFile raf = new RandomAccessFile(file, "rw");
        FileChannel fileChannel = raf.getChannel();
        MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());

真正的零拷贝


image.png

对应的java代码

 File file = new File("test.zip");
        RandomAccessFile raf = new RandomAccessFile(file, "rw");
        FileChannel fileChannel = raf.getChannel();
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("", 1234));
        // 直接使用了transferTo()进行通道间的数据传输
        fileChannel.transferTo(0, fileChannel.size(), socketChannel);

这两种方式的比较

使用mmap + write方式
优点:即使频繁调用,使用小块文件传输,效率也很高
缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU,内存安全性控制复杂,需要避免JVM Crash问题。

使用sendfile方式
优点:可以利用DMA方式,消耗CPU较少,大块文件传输效率高,无内存安全新问题。
缺点:小块文件效率低于mmap方式,只能是BIO方式传输,不能使用NIO。

RocketMq采用的是基于内存映射的方式,因为小块数据传输的更为频繁


消息的持久化

同步刷盘与异步刷盘.png

异步刷盘
写入到Page Cache后就立马返回了,然后再调用fsync函数异步的去将数据刷到磁盘

优点

效率高又快

缺点

断点或者重启,内存里面的数据还没来得及刷入到磁盘就没有了,所以会有丢消息的概率


同步刷盘

当然就是写入到Page Cache后就立马调用fsync函数立马刷入到磁盘

优点

可以做到不丢消息

缺点

当然就是牺牲性能了


接着再来分析下 几种消费消息的方式

At least Once

是指每个消息必须投递一次,RocketMQConsumer先pull消息到本地,消费完成后,才吐服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性

Exactly Only Once

(1).发送消息阶段,不允许収送重复的消息。
(2).消费消息阶段,不允许消费重复的消息。

只有以上两个条件都满足情况下,才能称为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到幂等性。RocketMQ虽然步能严格保证不重复,但是正常情冴下很少会出现重复収送、消费情况,只有网络异常,Consumer启停等异常情况下会出现消息重复。此问题的本质原因是网络调用存在不确定性,即不成功也不失败的第三种状态,所以才产生了消息重复性问题。

定时消息

定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意的时间精度,在Broker局面,必须要做消息排序,如果再涉及到持久化,那消息排序要不可避免的产生巨大性能开销。RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。


消息过滤

RocketMQ的消息过滤方式有别于其他消息中间件,是在订阅时,再做过滤,先来看下Consume Queue的存储结构


ConsumeQueue单个存储单元结构.png

(1)在Broker端迕行Message Tag比对,先遍历Consume Queue,如果存储的Message Tag与订阅的Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer。注意:Message Tag是字符串形式,Consume Queue中存储的是其对应的hashcode,比对时也是比对hashcode。(2).Consumer收到过滤后的消息后,同样也要执行在Broker端的操作,但是比对的是真实的Message Tag字符串,而不是Hashcode

这么做的原因?
(1)Message Tag存储Hashcode,是为了在Consume Queue定长方式存储,节约空间
(2)过滤过程中不会访问Commit Log数据,可以保证堆积情况下也能高效过滤
(3)即使存在Hash冲突,也可以在Consumer端进行修正,保证万无一失


高可用

谈到高可用,自然就想到集群,那么多台机器间消息的同步方式就有同步双写和异步复制两种

异步复制

异步复制的实现思路非常简单,Slave启劢一个线程,不断从Master拉取Commit Log中的数据,然后在异步build出Consume Queue数据结构。整个实现过程基本同Mysql主从同步类似。

同步双写

也类似于Mysql的半同步复制,即主上写完,其中一台从也要写完才统一返回给客户端ok.整体思想是类似的


上面我们谈到RocketMq没有使用Zookeeper而是自己实现了NameServer

 public boolean initialize() {
                .....        
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        .....
    }

我们可以看到,集群其实就是维护心跳,这里面其实还有很多细节,还没看完,看完再更新吧


Producer最佳实践
发送消息注意事项
(1)一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤。message.setTags("TagA");

(2)每个消息在业务局面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询返条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,返样可以避免潜在的哈希冲突。// 订单IdString orderId = "20034568923546";message.setKeys(orderId);

(3)消息发送成功或者失败,要打印消息日志,务必要打印sendresult和key字段。

(4)send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义
SEND_OK消息发送成功
FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失对与精确发送顺序消息的应用,由亍顺序消息的局限性,可能会涉及到主备自动切换问题,所以如果sendresult中的status字段不等于SEND_OK,就应该尝试重试。对于其他应用,则没有必要这样。
(5)对于消息不可丢失应用,务必要有消息重发机制
例如如果消息发送失败,存储到数据库,能有定时程序尝试重发,或者人工触发重发。

Consumer最佳实践
(1)将消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等,消费之前判断是否在Db或(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入入,如果报主键冲突,则插入失败,直接跳过)msgId一定是全局唯一标识符,但是可能会存在同样的消息有两个不同msgId的情冴(有多种原因),返种情况可能会使业务上重复消费,建议最好使用消息内容中的唯一标识字段去重。
2.使用业务局面的状态机去重

具体可见幂等总结


最后讲一下集群的搭建

Master-Slave 方式

1.服务器环境

序号 IP 用户名 密码 角色 模式
1 192.168.11.128 root *** nameServer1,brokerServer1 Master1
2 192.168.11.129 root *** nameServer2,brokerServer2 Master2

2.Hosts 添加信息

IP NAME

192.168.11.128 rocketmq-nameserver1
192.168.11.128 rocketmq-master1
192.168.11.129 rocketmq-nameserver2
192.168.11.129 rocketmq-master1-slave

vi /etc/hosts

3.上传解压【两台机器】

#   上传 apache-rocketmq.tar.gz 文件至/usr/local

#   tar -zxvf apache-rocketmq.tar.gz -C /usr/local

#   ln -s apache-rocketmq rocketmq ll /usr/local

4.创建存储路径【两台机器】

#   mkdir /usr/local/rocketmq/store

#   mkdir /usr/local/rocketmq/store/commitlog

#   mkdir /usr/local/rocketmq/store/consumequeue

#   mkdir /usr/local/rocketmq/store/index

5.RocketMQ 配置文件【两台机器】

#   vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties

#   vim /usr/local/rocketmq/conf/2m-2s-async /broker-a-s.properties

配置文件如下

#所属集群名字

brokerClusterName=rocketmq-cluster

#broker 名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a|broker-b

#0 表示 Master,>0 表示 Slave

brokerId=0

#nameServer 地址,分号分割

namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

#在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数 defaultTopicQueueNums=4

#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true

#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true

#Broker 对外服务的监听端口

listenPort=10911

#删除文件时间点,默认凌晨 4 点

deleteWhen=04

#文件保留时间,默认 48 小时

fileReservedTime=120

#commitLog 每个文件的大小默认 1G

mapedFileSizeCommitLog=1073741824

#ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间

diskMaxUsedSpaceRatio=88

#存储路径

storePathRootDir=/usr/local/rocketmq/store

#commitLog 存储路径

storePathCommitLog=/usr/local/rocketmq/store/commitlog

#消费队列存储路径存储路径

storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue

#消息索引存储路径

storePathIndex=/usr/local/rocketmq/store/index

#checkpoint 文件存储路径

storeCheckpoint=/usr/local/rocketmq/store/checkpoint

#abort 文件存储路径

abortFile=/usr/local/rocketmq/store/abort

#限制的消息大小

maxMessageSize=65536

#flushCommitLogLeastPages=4

#flushConsumeQueueLeastPages=2

#flushCommitLogThoroughInterval=10000

#flushConsumeQueueThoroughInterval=60000

#Broker 的角色

#- ASYNC_MASTER 异步复制 Master

#- SYNC_MASTER  同步双写 Master

#- SLAVE

brokerRole=ASYNC_MASTER

#刷盘方式

#- ASYNC_FLUSH  异步刷盘

#- SYNC_FLUSH   同步刷盘

flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false

#发消息线程池数量

#sendMessageThreadPoolNums=128

#拉消息线程池数量

#pullMessageThreadPoolNums=128

6.修改日志配置文件【两台机器】

#   mkdir -p /usr/local/rocketmq/logs

#   cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml

7.修改启动脚本参数【两台机器】

runbroker.sh脚本

#   vim /usr/local/rocketmq/bin/runbroker.sh

脚本如下
修改为1个g就好了

#开发环境    
JVM Configuration JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"

runserver.sh脚本

#   vim /usr/local/rocketmq/bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"

8.启动 NameServer【两台机器】

#   cd /usr/local/rocketmq/bin

#   nohup sh mqnamesrv &

9.启动 BrokerServer A【192.168.11.128】

#   cd /usr/local/rocketmq/bin
#   nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties
#   netstat -ntlp
#   jps
#   tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
#   tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

10.启动 BrokerServer B【192.168.11.129】

#   cd /usr/local/rocketmq/bin
#   nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a-s.properties
#   netstat -ntlp
#   jps
#   tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
#   tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log

11.停止命令

#   cd /usr/local/rocketmq/bin

#   sh mqshutdown broker

#   sh mqshutdown namesrv

#   --等待停止

#   rm -rf /usr/local/rocketmq/store

#   mkdir /usr/local/rocketmq/store

#   mkdir /usr/local/rocketmq/store/commitlog

#   mkdir /usr/local/rocketmq/store/consumequeue

#   mkdir /usr/local/rocketmq/store/index

#   --按照上面步骤重启 NameServer 与 BrokerServer

上一篇下一篇

猜你喜欢

热点阅读