运维相关rocketMq

rocketmq 与 kafka 对比漫谈

2019-07-27  本文已影响0人  左小星

  今年的一个周末,去参加了一场rocketMq的meet up分享,由此对rocketMq产生了极大的兴趣,rocketMq的社区也非常活跃,钉钉群里有很多大佬分享源码(钉钉群号: 21791227),通过看大佬们的分享与阅读它的源码,知道了其底层是怎么实现的,本文就大致对比一下rocketmq与kafka。

架构上对比

  rocketMq由nameServer、broker、consumer、producer组成,nameServer之间互不通信,broker会向所有的nameServer注册,通过心跳判断broker是否存活,producer 和 consumer 通过nameserver就知道broker上有哪些topic。


rocketMq架构

kafka的元数据信息都是保存在zk上的,由broker、zk、producer、consumer组成。


kafka架构

性能上对比

  kafka单机写百万级别的TPS,kafka的producer可以批量发送数据。
  rocketMq单机数十万TPS。

存储上对比

rocketmq存储

  MQ一般都有topic的概念,为了增加生产者和消费者的并发度,在kafka里有分区的概念,而在rocketMq中是queueId的概念,多个queue可以分布到不同的broker上。
  rocketMq底层存储依靠commitLog,commitLog存储了所有的topic的数据,在写磁盘上是顺序写。那么问题来了,所有的topic数据都放一个文件里,那消费者来消费的就懵逼了,消费者自言自语到我怎么知道去commitLog的那个offset去吧数据捞起来呢?
  这时候consumerQueue就该上场了~
  rocketMq 单机写commitLog时所有的线程是加锁等待写的,写到pagecache或者刷到磁盘以后,会有个单独的线程(ReputMessageService)来异步构建consumerQueue(一个queue对应一个cousumerQueue)。
consumerQueue由20个字节组成,一个long型offset(对应commitLog中的offset),一个int型的size(消息的长度),一个long型的tag tag有两个用途,一种是发送消息的tag值的hashCode用来消息过滤,一个是在延时消息中存储消息到期执行的时间。
  rocketMq也支持根据消息的key来搜索消息,这部分逻辑是在IndexFile中实现的,就相当与根据磁盘实现了一个hashMap,根据topic+key 的hash找到indexFile的物理存储位置,里面记录了commitLog中的offset,hash冲突是链表法解决的。indexFile的构建也是在ReputMessageService中异步构建的。
  由于rocketMq的所有topic数据都放在同一个commitLog中,rocketMq的单机broker可支持上万topic
  rocketMq默认是基于mmap搞得磁盘和内存的映射,而kafka是基于java的FileChannel的zero copy,有专门的一篇文章对这两个性能做了对比
https://mp.weixin.qq.com/s/EEOw0efharDzXXdegCoC4A
rocketMq当正在写的文件写满后,会用另一个线程创建一个新的基于mmap的磁盘映射文件,注意,这只是做了个映射,并没有吧文件对应的内存空间搞到pagecache,这时往mmap里写数据的话,操作系统会产生缺页中断,然后将磁盘的页加载到pagecache。如果没有进行磁盘与pagecache预热的话,那写数据的时候还是有一定性能影响的,rocketMq想到了这点,创建完mmap映射后,通过代码写了一些数据到pagecache,提高了性能。(对操作系统还不是很熟,mmap原理说的不是太清楚)。

rocketMq存储
kafka存储

  kafka的topic是partition的概念,一个topic会有多个partition,partition会分布在不同的broker上,在单个partition是顺序写。broker写消息到partition的时候是写到pagecache中。
  但是,你有没有想过一个问题,当broker单机的partition过多的时候,很多partition同时往pagecache中写数据,相对与磁盘来说这就是随机写了,这时候kafka的性能会急剧下降

kafka存储

高可用与多副本

rocketMq高可用

  rocketMq 提供了两种架构,一个是master/slave 架构,slave同步master的数据,但是这个架构目前开源版本的当master挂了以后,slave不能主动切换成master,需要人工切换。
  还有一个是DLedger多副本机制(底层源码多线程玩的很6),底层使用raft算法,实现了高可用。使用这种模式,一个leader应该至少对应两个副本,这样才能在leader挂掉以后,根据投票才能选出另一个leader。这篇文章感觉写的很好,介绍了Dledger
https://www.infoq.cn/article/f6y4QRiDitBN6uRKp*fq

rocketMq消息不丢失

  rocketMq的commitLog模式是使用操作系统的mmap将磁盘与内存做映射,写消息时是先写到pagecache。rocketMq提供了几种策略(master/slave架构下),同步刷盘、异步刷盘、sync slave、async slave。金融场景下对数据一致性较高的情况下,建议采用同步刷盘、sync slave。
  rocketMq还提供了一个堆外内存池的优化策略,写数据时先写到堆外内存,然后在有单独的线程刷到 pageCache中,之后在有单独的线程刷到磁盘,这样做的好处是 实现了读写分离。写堆外内存,读pagecache,是不是一个很赞的做法,这种情况下有可能丢数据。
  DLedger多副本的情况下,由leader先写到pagecache,leader等半数以上的flower写成功以后,返回成功给生产者。由raft算法的强leader性,数据丢失不了,有可能重复,这时候要在消费端保证幂等。说明:DLedger的实现,吧raft里面的状态机去掉了。
leader中为每一个副本维护了一个writeIndex,其实也就是每一个副本对应着一个线程,写数据时先写到leader成功,然后leader的本地的leaderEndIndex增加。leader中维护的每个副本对应的线程只要发现副本对应的writeIndex小于leaderEndIndex,就会不停的向副本push数据。然后leader中还会有一个单独的check线程,检测当半数以上的副本写成功后,就会认为这条数据写成功了,然后CompletableFuture设置为完成,唤醒发送数据线程池中等待发送结果的线程,告诉它发送成功。这里面的实现还是有点复杂的,包括leader选举、数据同步等等,之后会专门写个源码分析~

kafka高可用

  kafka单个partition会有多个副本,producer写数据的时候,会往leader里面写(读消息也是从leader副本读),然后follower会同步leader的数据,同时会在zk中维护一个isr的副本列表,在isr列表中的副本都是能跟上leader数据的。如当一个副本所在的机器宕机或发生了fullGc,这时候这个副本会被剔除isr列表,当这个副本跟上leader数据的offset之后,会被重新加入到isr列表中。当副本的leader挂了以后,zk会在isr中的副本选主,所以kafka的副本最少可以设置一个。

kafka消息不丢失

  kafka的生产者acks提供了几个选项,发送到主不管是否成功就返回、发送到主主成功后返回、发送到主 主同步到所有的副本成功后返回。
kafka没有提供主动刷盘的机制,要保证消息不丢失,应该等所有的副本同步完了在返回成功。
在多副本情况下,涉及到副本之间数据同步,必然有快慢之分,因此kafka有两个概念 : LEO和HW。
LEO:即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值,leader 更新LEO的值是在写入数据后更新,follower更新LEO的值是follower从leader拉取数据写入到本地之后更新。
HW:即水位值,小于HW的offset被认为是更新到所有副本,这时候该数据可以被消费者消费。
HW和LEO更详细的文章:https://www.cnblogs.com/huxi2b/p/7453543.html

发送消息exactly once

  消息中间件应该都会有失败超时重试机制,这时就体现出了生产者的幂等性,重试的时候有可能会多次发送同一条消息。

rocketMq是不能保证producer的幂等性。

  在master/slave架构下,并且没有指定生产者发送顺序消息,当producer忘其中一个broker发消息失败后,producer会记录下这个发失败的broker,消息发送重试时会跳过这个broker(失败有可能是超时导致,其实存储成功了,但在producer端来看是失败了)。rocketMq发送消息有可能返回broker busy 的错误,产生这种错误的原因 : broker的pagecache繁忙,或者发消息的tps太高,broker端的线程池被打满。
  在DLedger多副本的情况下,也有可能导致消息多次发送。多副本异地容灾,跨机房部署,有可能网络抖动机房间延时增大,导致发送消息超时,其实最终这条消息会同步到每个副本中的。超时之后重试,这就导致了消息重复发送。

kafka的exactly once

  kafka的producer端幂等性是根据分区来说的,每个生产者有一个唯一的PID,然后producer端生成Sequence Number,broker端对PID + 分区 + Sequence Number 做保存,这样就能维护幂等性了(没看过这部分代码,因此说的不是很明白)...

producer事务性消息

  rocketMq支持producer端的事务消息。先发一个prepare message,broker端判断是prepare message,这时候不会异步构建consumerQueue,消费者也就看不到这条消息。prepare message 发成功后,执行本地事务操作,执行成功后告诉broker成功,然后broker就会从拿到prepare message中的信息,发送正式消息,这时候消费者就能消费了。
那么本地事务执行失败或者producer所在的机器宕机,没有告诉broker结果会怎么样?
broker会定期的回查producer组,询问这条消息对应的本地事务是否执行成功,成功了执行上面的操作。其实rocketMq的事务消息只是保证了执行本地事务和发消息的原子性,并不能保证consumer端事务的执行成功,感觉有点像分布式事务中的本地消息表实现
  kafka不支持以上的事务性消息。

producer延时消息

  rocketMq延时消息只支持特定的延时级别,延时级别在rocketMq里面有几种,不支持任意的延时时间,要是支持任意的延时时间,broker端需要对消息排序,对性能影响太大。
那么rocketMq的延时消息是什么实现原理呢?
  broker发现producer发送的是延时消息,会吧消息的topic替换成SCHEDULE_TOPIC_XXXX,而原消息的topic会放到消息的Property中, 然后写到commitLog中,异步构建延时消息的consumerQueue(consumerQueue中的tag对应消息投递的时间点)。每个延时队列级别对应一个TimerTask,等第一条消息延时时间到了,然后在去commitLog中吧替换topic之后的消息捞出来,重新组装消息,吧原消息的重新put到commitLog中,成功后再去捞取下条延时消息执行的时间,如果时间到了立刻执行,时间没到,根据timeTask的机制,等到该执行的时间点然后在执行。说明:每种延时级别的队列里消息是有序的。
  kafka不支持延时消息。

consumer对比

消息获取方式

  rocketMq支持推拉结合的方式。也支持消息长轮训。
什么是消息长轮训呢?
  consumer去broker端拉取消息是根据queueId的维度去拉的,有消息的话broker会里吗给consumer返回消息,如果目前还没有最新消息,broker会吧TCP连接hold一会,在hold的期间如果有新消息到来,broker会里吗给cousumer返回消息。在等待一段时间后任然没有新的消息,这时候broker会给consumer返回没有消息,consumer收到结果后会再次来拉取消息,重复以上过程。rocketMq底层通信基于netty实现,实现长轮训十分简单。rocketMq默认是push模式,也提供了pull模式的api。
长轮训的优点是减少了不必要的网络调用。阿里的nacos配置中心也提供了http长轮训的解决方案

  kafka只有pull模式。

consumer消费失败重试

  非顺序性消息,rocketMq的消费者组在启动时会订阅一个consumerGroup对应的retryTopic,当消费者组下的consumer消费消息失败后,根据rocketMq的延时消息机制,在延时特定的时间后,该consumerGroup还能消费到这条失败的消息,如果连续几次都没消费成功,这时候这条消息会进死信队列。
  顺序性消息,如一个订单的消息发往同一个consumerQueue(在producer发消息时指定路由规则),如果顺序消息有一条消费失败,会阻塞这条消息后面的消息消费
  kafka不支持消费失败重试。

消息搜索

  rocketMq 基于indexFile,在producer发消息时指定消息的key,之后可以根据key来搜索这条消息。原理其实就是个基于磁盘实现的hashMap。
  kafka不支持消息搜索。

消息过滤

  rocketMq支持消息的tags,消费者组订阅可以指定tags订阅,tag过滤的话在broker端和consumer端都有过滤,broker端只能根据hashCode过滤,hashCode有可能冲突,所以还需要在consumer端根据tag的值进行过滤。
  kafka不支持消息按tag过滤。

  以上就是通过学习rocketMq源码和我所了解到的kafka的相关知识总结出来的,如有错误,欢迎指出讨论~

上一篇下一篇

猜你喜欢

热点阅读