MQ:Kafka、RocketMQ、RabbitMQ

2022-02-18  本文已影响0人  请叫我平爷

队列是一种先进先出的数据结构

MQ

消息队列:把要传输的数据放在队列中

image.png

优点:

  1. 解耦,支付只需要关注关注重要的支付就行,其他的比如更新用户积分、通知商家发货等交给MQ来做
  2. 异步,更新用户信息、通知商家都是异步进行,提高了吞吐量
  3. 削峰,队列的顺序性实现消息的延迟消费

缺点

  1. 系统可用性降低。依赖服务增多。需要考虑MQ挂了的情况。
  2. 系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性
  3. 业务一致性。主业务和从属业务一致性的处理

使用消息队列要考虑的问题

  1. 高可用
    消息队列要是集群/分布式的,能够提供现有支持,而不是手动代码实现
  2. 数据丢失问题
    消息队列要能持久化,这样才能减少数据的丢书。
  3. 消费者怎么的得到消息队列的数据?
    生产者->消息队列,
  4. 消息队列有数据了,主动叫消费者拿,push
  5. 消费者不断轮询,看有没有新的数据,如果有就消费,pull

RabbitMQ

模式:

https://rabbitmq.com/getstarted.html

image.png image.png image.png image.png image.png image.png

组件介绍

高可用方案:

  1. 普通集群模式,只有RabbitMQ上有queue,其他的RabbitMQ通过网络去这台实例上获取queue
    消费者拉取的机器如果有queue,直接返回,没有,则实例之间会产生网络传输
    有queue的机器宕机了,导致其他机器都无法拉取数据
  2. 镜像集群模式,没个实例都有queue
    缺点:性能消耗大,所有机器都要进行消息同步
    没有扩展性可言,如果某个实例的queue很大,增加实例也没有用

保证消息不重复消费:

如果消费者消费完信息,这时没来的及ack就挂了,那么就会出现重复消费的问题
保证幂等性

  1. 内存维护一个set,从消息队列中获取一个消息,先查询是否在set里面,在,就表示已消费,不在,加入set
  2. 写数据库,拿唯一键去数据库查询下,不存在就写,存在就表示已消费
  3. 写redis,用set
  4. 让生产者发送消息时,每条消息加一个全局唯一的id,然后消费时,将id保存到redis中,消费时去redis里面查一下,没有再消费
  5. 数据库操作可以设置唯一键,防止数据重复的插入。


    image.png

生产者消息丢失

  1. 传入消息时丢失
    解决:
    1. 使用RabbitMQ提供的事务功能。

     ```
     channel.txSelect();//开启事务
     try{
        //发送消息
    }catch(Exection e){
       channel.txRollback();//回滚事务
       //重新提交
    }
     ```
    

    缺点:事务开启,就会变成同步阻塞操作,生产者会阻塞等待是否发送成功,性能较低

    1. 开启confirm模式,变成异步。
      每次写的消息都会分配一个唯一的id,然后写入rabbitMQ,rabbitMQ会回传一个ack消息。
      • 如果没有处理这个消息,会回调nack,你可以进行重试。
      • 如果超过一定时间没有收到消息回调,你可以进行重发。
      //开启confirm
      channel.confirm();
      //发送成功回调
      public void ack(String messageId){
      
       }
            // 发送失败回调
      public void nack(String messageId){
          //重发该消息
      }
      

MQ消息丢失

RabbitMQ
收到消息,暂存在内存中,还没存到磁盘,然后机器挂了,导致数据丢失。

解决:消息持久化到磁盘。
分为两步:

  1. 创建queue的时候将其设置为持久化,这样可以保证rabbitMQ持久化queue的元数据,但是不会持久好queue里面的数据
//Exchage持久化
hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
//queue持久化
channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
  1. 发送消息的时候将消息的deliveryMode设置为2,这样消息就会被设置为持久化方式,rabbitMq就会将消息持久化到磁盘上。
//消息持久化,在投递时制定deliveryMode=2(1是非持久化)
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());  

设置这两个持久化,并使用confirm机制,只有消息被持久化到磁盘,才会通知生产者ack

消费者消息丢失

RabbitMQ
消费者消费到某条消息的时候,还没处理,就挂了,rabbitMQ认为你消费了,消息丢失
解决:关闭rabbitMQ的自动ack,等处理完毕在手动ack。

//关闭自动ack
//basicConsume(String queue, boolean autoAck, Consumer callback)
channel.basicConsume(queueName, false, this);

保证消息顺序执行
一个queue,多个consumer消费,造成顺序错误,
consumer从MQ读取是有序的,处理的时间是不固定的,无法保证先读到的先完成,造成顺序错误
例如,读取顺序,a=1,a=2,a=3,执行的顺序,a=1,a=3,a=2,最终存表是a=2

一个queue,一个consumer,consumer里面进行了多线程消费,也会造成消息消费顺序错误。

  1. 解决1:拆分多个queue,每个queue一个consumer,consumer单线程
    缺点:吞吐量降低,queue增多
  2. 解决2:consumer多线程处理。先将消息保存在内存队列中,将关键值相同的的数据保存到相同的消息队列中,然后分发给底层不同的worker来处理

消息积压

  1. 解决consumer的问题,确保其恢复消费速度。停掉所有的consumer
  2. 临时建立好原来10倍20倍的queue数量
  3. 写一个临时分发消息的consumer程序,消费积压的消息,消费之后不做耗时处理,直接均匀写入临时建好10倍的queue里面去
  4. 紧急征用10倍的机器来部署consumer,每一批consumer消费一个临时queue消息。相当于将queue资源和consumer扩大10倍,以之前10倍的速度去消费
  5. 等快消费完后,恢复原来框架,重新用原来的consumer机器来消费信息

设置了过期时间,过期就丢了

rabbitMQ是可以设置过期时间的。
解决:流量低峰期,写程序,手动查询丢失的数据,重新发送到MQ,把丢失的数据补回来

积压消息长时间没处理,MQ放不下?

  1. 扩容,加queue,加consumer
  2. 写个临时程序,丢去一部分数据,流量低峰期,补齐数据

Kafka

高可用方案

创建一个topic,会划分成很多partition,每个partition在不同的broker上
读写只从leader上读取,leader会自动同步到follower
如果broker挂了,恰好某个partition的leader在这台broker上,那会从其他的follower上选举出一个新的leader

保证消息不重复消费

如果消费者消费完信息,这时没来的及提交offset就挂了,那么就会出现重复消费的问题
保证幂等性

生产者消息丢失

kafka

MQ消息丢失
Kafka

解决:设置4个参数

  1. topic设置replication.factor参数:大于1,要求每个partition必须至少2个副本
  2. kafka服务端设置min.insync.replicas参数:大于1,一个leader至少感知一个follower还跟自己联系,确保leader挂了还有一个follower
  3. product端设置acks=all:每条数据,必须写入所有replica之后,才认为是成功了
  4. product端设置retries=MAX:一旦写入失败,就无限重拾,卡在这里

这样能保证kafka的broker端发送故障,leader转移时,不会丢失数据。

消费者消息丢失

Kafka

kafka会自动提交offset,就是消费者还没开始消费,就自动提交了offset,让kafka以为消费好了。

解决:关闭kafka的自动提交offset,处理完毕后再手动提交offset,可以保证数据不丢失。
但是还会出现重复消费的情况,需要自己保证幂等性

保证消息顺序执行

一个topic、一个partition、一个consumer,consumer内存进行了多线程消费,也会出现顺序错乱的问题

具有顺序的数据写入不同的partition,不同的消费者去消费,但是每个consumer的执行时间是不固定的,无法保证先读到消息的consumer一定先完成操作
造成消息没有按照顺序执行,造成数据顺序错误。

  1. 解决1:确保一个消息发送同一个partition以及一个topic,一个partition只有一个consumer,consumer内部消费是单线程。
  2. 解决2:写N个queue,然后N个现场分别消费一个内存queue

消息积压

  1. 解决consumer的问题,确保其恢复消费速度。停掉所有的consumer
  2. 临时新建一个topic,partition是原来的10倍
  3. 写一个临时分发消息的consumer程序,消费积压的消息,消费之后不做耗时处理,直接均匀写入临时建好10倍的partition里面去
  4. 紧急征用10倍的机器来部署consumer,每一批consumer消费一个临时partition消息。相当于将partition资源和consumer扩大10倍,以之前10倍的速度去消费
  5. 等快消费完后,恢复原来框架,重新用原来的consumer机器来消费信息

积压消息长时间没处理,MQ放不下?

  1. 扩容,加partition,加consumer
  2. 写个临时程序,丢去一部分数据,流量低峰期,补齐数据

topic分配partition的规则

  1. rangeAssignor 默认分配策略


    image.png
  2. roundRobinAssignor 轮询


    image.png

    会出现问题


    image.png
  3. StickyAssignor

  4. 自定义分配策略

高并发场景下,如何保证收发消息的性能?

  1. 生产端批量发送
  2. broker异步刷盘
  3. 消费者批量拉取

如何保证消息服务的高可用和高可靠?

  1. broker集群,服务发现和注册,负载均衡、超时自动重试
  2. partition一个leader多个follower,并且partition分布在多个broker上,partition故障自动转移
  3. 存储采用追加日志文件模式+索引查找

如何保证服务是可以水平任意扩展的?

broker集群,如果新的broker,只需要注册到注册中心即可。

如何保证消息存储也是水平可扩展的?

利用分片存储技术,partition分布在多个broker上

各种元数据(比如集群中的各个节点、主题、消费关系等)如何管理,需不需要考虑数据的一致性?

一个topic对应多个partition,一个partition对应一个consumer

上一篇 下一篇

猜你喜欢

热点阅读