kafka和rocketmq
kafka:日志传输
rocketmq:订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发
0. kafka 与 zookeeper原理
kafka:
-
producer端使用zookeeper用来"发现"broker列表,以及和topic下每个partition master
建立socket连接
并发送消息 - broker端使用zookeeper用来注册broker信息,已及监测partition master存活性
- consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition master建立socket连接,并获取消息
1. namesvr与zookeeper
kafka的架构:
- topic --(选择partition个数)--> partition-n -- (根据设置副本zookeeper选举出1-master/n-slave)--> replication-n --> 系统自动从所有机器中为每个topic_partition分配1个master + 多个slave
由于master/slave需要选举,所以使用zookeeper
- 一个节点既可以是
master
也可以是slave
- partition master挂了,slave选举成为新的master(通过controller使用zk选举产生)
- 通过zk的临时节点竞争选出broker cluster中的controller
- 通过controller使用groupId+topic在__consumer_offset的partition leader作为group coordinator
- 通过coordinator选出所有consumer中的一个作为leader consumer
- 由leader consumer进行分配上报给coordinator, coordinator再下发给所有consumer**)
- broker是物理概念,对应一台物理机,partition是逻辑概念
- 先有broker,然后产生出master/slave
- producer端发送的message必须指定是发送到哪个topic,但是不需要指定topic下的哪个partition,因为kafka会把收到的message进行
load balance
,均匀的分布在这个topic下的不同的partition上( hash(message) % [broker数量] )
- producer先把message发送到partition master,再由master转发给其他partition slave,master反馈ack给producer前需要保证有n个replica已经收到该消息
kafka架构.png
rocketmq的架构:
- topic --(选择queue个数)--> queue-n --(机器启动时指定clusterName对应的brokerId)-->clusterName --> 系统自动分配cluster(多个master + 多个slave) 和你的topic之间的映射关系
由于部署时已经定义好,无需选举,只需要namesvr进行状态监控topic->queue关系即可
- 一个节点(同一个实例)只能是
master
或者是slave
- master挂了,请求转移到其它master
- broker是逻辑概念,对应1个master/n个slave,cluster(1:n)broker
先定义master/slave,然后组合出broker(每个broker分配的topic和对应的queue的数量也是一样的)
rocketmq架构.png
2. kafka吞吐量大于rocketmq
kafka在producer端做了数据合并,批量发送,性能是rocketmq的十倍(rocketmq没有做这层处理)
分区 partition单线程消费保证顺序性:
同一个消费组,一个分区只支持一个消费线程来消费消息
一个Topic会设置成多分区的模式,来支持多个消费者
每个生产者自己通过逻辑指定topic 下的partition/queue来保证同一订单的分区内顺序一致
kafka-consumer/producer公用一个文件: 每个topic + partition
都会创建一个物理文件
,当topic变多的时候,消息分散的落盘策略会导致consumer消费
时磁盘IO竞争激烈成为瓶颈(超过8个
topic就会出现抖动)
rocketmq-consumer/producer分开文件: 把producer产生的所有topic
的所有·queue
的消息存储在一个物理文件commitLog
中,逻辑上的划分,通过异步线程
同步到ComsuerQueue_n文件 - 只保存commitLog的offset
中,consumer消费
- comsuerQueue 是顺序读写
- commitLog是顺序写,随机读
- commitLog/comsuerQueue内部保存一个
链表
,通过offset映射到链表
上的内存映射文件(最大2G)
,进行读取
rocketmq是java编程 -> 大量数据gc严重
请求失败 -> 丢失数据,但是业务却被告知成功(订单不接受)
producer都是分布式,请求量不大可能这么大
缓存可以由上层业务处理
3. 数据可靠性
rocketmq:支持异步实时
刷盘,同步
刷盘,同步
Replication,异步
Replication
kafka:使用异步/同步
刷盘,异步/同步
Replication,同步下性能极具下降
- 通过配置ack可以实现(同步repliacation)
- 缺省Kafka是异步刷盘,调用 1次/3秒 fsync,可以调整为(同步刷盘)
异步刷盘存 -> 宕机丢失数据(但是在集群情况下,整个集群丢数据概率很低)
宕机机器恢复 -> 与现有master产生数据冲突
4. 消息投递实时性
kafka 使用短轮询,实时性取决于轮询间隔时间
rocketmq 使用长轮询,同push方式实时性一致,消息的投递延时通常在几个毫秒
kafka 0.8版本后支持long pull长轮询
5. 消息重试机制/定时消息
kafka 支持
消息发送失败重试机制, 但是只能是固定的重试间隔
rocketmq 支持
消息发送失败重试机制(可以使用delay定时
的尝试发送/接收消息,以及尝试的机会)
在网络抖动等场景下,重试机制非常重要!!
5. 消息顺序性/重复
多线程
中若没有因果关系则没有顺序
。那么用户在多线程中去发消息就意味着用户不关心那些在不同线程中被发送的消息的顺序。
即多线程发送的消息,不同线程间的消息不是顺序发布的,同一线程的消息是顺序发布的
kafka 支持消息顺序,但是一台broker宕机后,就会产生消息乱序(当然,这个是在kafka使用异步repliacation的情况下
)
一个consumer
只能消费一个partition
,来保障顺序写
rocketmq 支持严格的消息顺序,在顺序消息场景下,一台broker宕机后,发送消息会失败,但是不会乱序
一个consumer
可以消费多个queue
,但一个queue
只能由一个consumer
消费
Listener 接口方法:
MessageListenerOrderly
-> 有序
MessageListenerConcurrently
->无序
分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费
全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费(topic只有一个分区的特例)
mysql binlog分发需要严格的消息顺序
消息重复问题,通过消费方幂等来解决
6. 消息查询
rocketmq 可以根据messageId查询消息信息
消息丢失问题非常有帮助(比如丢失订单,是由于没有收到信息,还是处理异常)
7. 消息回溯
kafka/rocketmq 都支持offset
总结:典型业务场景如consumer做订单分析,但是由于程序逻辑或者依赖的系统发生故障等原因,导致今天消费的消息全部无效,需要重新从昨天零点开始消费,那么以时间为起点的消息重放功能对于业务非常有帮助
8. 消费并行度
kafka 依赖topic配置的分区数
,即消费并行度和分区数一致
rocketmq
- 顺序消费方式并行度同Kafka完全一致
- 乱序消费方式并行度取决于consumer的线程数(如topic配置10个队列,10台机器消费,每台机器100个线程,那么并行度为1000)
8. consumer负载均衡
kafka:
首先为每个Consumer Group选出了一个Coordinator,所有的Consumer要先找到这个Coordinator,Coordinator从所有Consumer中,选出了一个Master Consumer,让它负载分配。它分好之后,把分配结果传给其他的Consumer
rocketmq:
consumer主动上报信息给broker,broker进行收集consumer信息,consumer主动获取broker上topic的consumer group中所有consumer的列表,每个consumer自己计算对应的consumerMessageList