杂物间简书收藏 -- 消息队列

消息队列的对比调研

2017-03-03  本文已影响5559人  ezbuy研发

我们发现Redis的作者出了一个新的消息队列系统Disque,我做了一点调研来决定我们使用哪种消息队列,主要对比了Disque、Kafka和RocketMQ。

Disque的一些不足
最近一次提交时间:Apr 29, 2016(调研时间2017-2-21)
社区并不活跃,网上可以查到的资料较少
C编写,我们的技术栈为go和c#,我花了一些时间阅读了资料,并做了大量测试,也阅读了一些源码,在一定程度上弥补了上述问题
由于运行在内存,获得大吞吐量的同时,失去了保存大量消息的能力

** Disque的消息存储**
一条消息称为一个job,job会根据配置的副本数量分布在多个节点的内存中,队列信息是每个节点单独存储的
存储job使用的跳表

Kafka的消息存储

partition简单图示.png
物理存储简单图示.png

RocketMQKafka的一些对比
RocketMQ使用Java实现,kafka使用Scala实现
性能上kafka略高,可用性和数据可靠性都差不多(因为存储模型差不多)
RocketMQ支持事务消息(这里的事务消息存在缺陷,而且在分布式事务中)
RocketMQKafka主要在这几个方面:消息的存储、Prdocuer端的服务发现、消费offset的存储、consumer负载均衡、Name Server和ZooKeeper
总体来说,RocketMQKafka比较类似,RocketMQ数据安全性稍好,kafka性能稍好;RocketMQ不需要zookeeper,但是同样需要额外的机器来部署他的Name Server
另外,RocketMQ有延时消费功能

我们的选择:Disque

测试数据

下面是EZMqClient的使用示例:

// EZMqClient
mqClient = ezlib.NewDisqueClientWithDefaultConf(conf.MqServer, conf.MqClient)

// 添加消息监听
mqClient.AddConsumeListener("testTopic", func(job *ezlib.EZJob) {
    fmt.Printf("consume message: %v\n", job)
})
// 发送消息
mqClient.Push("testTopic", "bbbbbb")
// 发送延时消息,发送后,消息队列会在delay之后将消息推送给消费者
mqClient.PushDelay("testDelayTopic", "aaaaaaaaaaaaaaa", 15*time.Second)

// 定时任务,你可以在多个节点启动相同的定时任务,系统会保证只有一个节点去执行任务,所以只需要有一个节点存活,这个定时任务就可以执行
mqClient.AddCrontab("testCrontab", "0/10 * * * * *", func(taskTime time.Time) {
    fmt.Printf("crontab taskTime:%v now:%v\n", taskTime, time.Now())
})

///////////////   消息广播  /////////////////////////
// 增加广播消息监听,同一个topic下的不同的group都会收到广播消息,同一个group中只有一个节点会收到消息
mqClient.AddBroadcastListener("broadcastNameTest", "broadcastNameTestGroup1", func(msg string) { fmt.Printf("broadcast1:%v\n", msg) })
// 发送广播消息
mqClient.PushBroadcast("broadcastNameTest", "hahahahaha")
// 发送广播延时消息
mqClient.PushBroadcastDelay("broadcastNameTest", "ooooooooooo", 10*time.Second)

Disque热扩展
热扩展:Disque在一个节点内存不足时,收到新的Job,会将这个Job转发给其他内存足够的节点;热扩展只需增加节点,并且通过Disque提供的客户端将新启动的节点加进集群即可

Disque如何实现高可用
高可用: EZMqClient 会为每个addjob操作增加参数指定副本数量,并且在getjob成功之后调用ackjob,以此保证“至少一次”的消息消费;当有节点失效时,只要一个job的多个副本不是都在那些失效节点上时,则job不会丢失,整个集群正常工作
目前打算设置副本数量为2,集群物理机3台,由于Disque单线程,一台物理机可以启动多个Disque实例,但需要注意job的2个副本不可处于同一台物理机,否则这台物理机失效时将导致job丢失,考虑到Disque的吞吐量完全足够,而且Disque无法保证job的2个副本所在的节点一定会分布在不同的物理机上,所以单机启动一个Disque实例就可以了,可以容忍集群中1台物理机的挂机

阅读Disque源码的一些建议
Disque 大量重用了 Redis 的底层代码, 比如数据结构部分、事件部分、网络通信部分、服务器主循环部分等等。
Job会根据命令指定的副本数量存放在多个节点中,Queue底层为跳表,客户端addjob时连接哪个节点则在哪个节点建立Queue(这个节点没有这个topic的Queue时),副本传播到的节点只会存储Job副本不会建立Queue
只有同一个topic的所有job都在同一个节点的同一个Queue中时,才能保证顺序(但由于网络延迟,消费者也不一定会按顺序接收到消息),其他情况都无法保证顺序(这里比较复杂,想要详细了解的同学请自行阅读源码并实际操作尝试)
如果对Redis源码有阅读过的同学,可以只需要阅读Disque的job、queue、cluster、disque* 这几部分的源码

上一篇下一篇

猜你喜欢

热点阅读