消息队列

2022-01-18  本文已影响0人  source201

消息队列

为什么使用消息队列

使用消息队列的主要场景:解耦、异步、削峰。

解耦

适合场景:系统的数据需要发送给多个系统,且后续接收的系统可能存在变化。因此可以通过消息队列将数据发送和接收解耦,当制定好数据的结构,发送系统和接收系统只需要专注于自己本身业务。

image-20220107135550103.png

异步

适合场景:处理流程中需要通知其他系统(不需要返回值),可以通过消息队列将其改成异步,接口无需等待其他系统的返回值,提高接口响应速度。

削峰

适合场景:间歇性的密集处理,系统一般处理密度不够(例如100),但在高峰时会提高到(10000),若直接调用处理方法,高峰时会直接崩溃,若用消息队列进行缓冲,处理方法依旧能够按照自己的处理速度来进行处理,不会崩溃。

消息队列的缺点

缺点

(1)系统可用性降低:系统引入的外部依赖越多,越容易挂掉。将系统间的调用关系改成通过消息队列进行数据交互,若消息队列异常,系统的交互则会出现问题。

(2)系统复杂度提高:消息队列的引入,就需要考虑重复消费问题和消息丢失问题

(3)一致性问题:a系统将消息通过消息队列发送给b,c后,就给用户返回成功了,但b入库成功,c入库失败,则会出现数据不一致问题。

Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

如何保证消息队列的高可用

Kafka 一个最基本的架构认识:由多个 broker 组成,每个 broker 是一个节点;你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。并且每个 partition会设置多个replica 。

image-20220114170846905.png

kafka0.8以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。只能读写 leader?很简单,要是你可以随意读写每个 follower,那么就要 care 数据一致性的问题,系统复杂度太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。

image-20220114152959702.png

这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker 上面的 partition 在其他机器上都有副本的。如果这个宕机的 broker 上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了。

写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为)

消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。

如何保证消息不被重复消费?(如何保证消息消费的幂等性)

出现重复消费的一些情形

Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。

若遇到系统重启,导致 consumer 有些消息处理了,但是没来得及提交 offset。重启之后,少数消息会再次消费一次。

数据 1/2/3 依次进入 Kafka,Kafka 会给这三条数据分别分配的offset是 152/153/154。消费者从 Kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 Zookeeper,此时消费者进程被重启了。那么此时消费过的数据2 的 offset 并没有提交,Kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会从Kafka接着把上次消费到的那个地方(152)后面的数据拉取回来。由于之前的 offset 没有提交成功,那么数据 2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。

如何保证幂等性

消息的幂等性一般是由消费者进行保证。这个需要结合具体业务:

(1)例如将数据插入数据库,可以根据主键查询后,如果有,则进行update。或者基于数据库的唯一键来保证重复数据不会加入数据库,这样插入只会报错,不会导致出现脏数据

(2)如果是存入redis中,redis的set每次都是更新,天然的幂等性

(3)如果是数据是需要进行其它处理的,可以给每条消息添加一个全局唯一id,每一次消费数据前,先到redis中查询一下是否被消费,如果已消费则不再处理。

如何保证消息的可靠性传输?(如何处理消息丢失的问题?)

对于消息丢失,则主要发生在生产者、消息队列本身、消费者三个地方。

对于kafka:

(1)生产者端,消息丢失:

设置 acks=all ,这样,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

(2)kafka端,消息丢失:

这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,就会导致数据丢失。

此时一般是要求起码设置如下 4 个参数:

这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。

(3)消费者端,消息丢失:

消费者丢失消息,则在于消费者自动提交了offset,导致kafka认为你已经消费成功了,但是消费者只是刚接收消息,若此时消费者挂了,则重启后,重新拉取消息,只会拉到后面的消息,之前的消息就丢了。因此消费者端,应该改为手动提交offset.

如何保证消息的顺序性

消息乱序的场景

往消息队列中插入三条消息,新增、修改、删除数据。

kafka这边,比如说我们建了一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。

消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。

image-20220118160754923.png

解决方案

在消费者端,写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性

image-20220118160910446.png

如何解决消息队列的延时以及过期失效问题?(消息队列满了以后该怎么处理?)(有几百万消息持续积压几小时,怎么解决?)

这个主要就是消费端出故障了,大量的消息在mq里积压。

以下场景:

大量消息在 mq 里积压了几个小时了还没解决

思路:临时紧急扩容

具体步骤和思路:

mq 中的消息过期失效了

假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢

我们可以采取一个方案,就是批量重导,就是重新写程序,将丢失的数据重新查询,发送到mq中。

mq 都快写满了

这个是第一个方案执行晚了,只能快速消费所有信息,都丢弃掉,如何在使用的第二个方案,批量重导。

解决思路

针对消息积压,主要思路:

(1)提高消费并行度:增加消费者数量

(2)批量方式消费:部分场景下,批量处理数据的效率远高于单个处理的效率,则可以进行消息的批量拉取消费

(3)跳过非重要消息:发生消息堆积时,消费速度一直追不上发送速度,若数据不太重要,可以考虑丢弃部分或全部消息

(4)优化每条消息消费过程:减少每条消息消费时间

如果让你写一个消息队列,该如何进行架构设计

面试官一般主要考察两块:

比如说这个消息队列系统,我们从以下几个角度来考虑一下:

本文参考来源:https://doocs.gitee.io/advanced-java/#/./docs/high-concurrency/mq-design

上一篇下一篇

猜你喜欢

热点阅读