RocketMQ学习

2020-05-08  本文已影响0人  John13

RocketMQ深度解析
RocketMQ之一:RocketMQ整体介绍
RocketMQ之二:分布式开放消息系统RocketMQ的原理与实践(消息的顺序问题、重复问题、可靠消息/事务消息)
RocketMQ之六:RocketMQ消息存储
《浅入浅出》-RocketMQ

高可用(集群)

使用镜像模式搭建高可用集群,可以配置数据同步到所有节点还是指定数量的节点以满足实际需求。

RocketMQ Slave不可以写,可以读,类似于MySQL的主从机制。

Broker是以Group为单位提供服务。一个Group里面分Master和Slave,Slave从Master同步数据,支持同步双写异步复制两种策略。

多master多slave模式部署架构图:

消息存储

1、CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,顺序写,随机读。
2、 ConsumeQueue:消息消费的逻辑队列,作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
3、IndexFile:索引队列用来存储消息的索引key,为了消息查询提供了一种通过key或时间区间来查询消息的方法。
4、MapedFileQueue:对连续物理存储的抽象封装类,源码中可以通过消息存储的物理偏移量位置快速定位该offset所在MappedFile(具体物理存储位置的抽象)、创建、删除MappedFile等操作;
5、MappedFile:文件存储的直接内存映射业务抽象封装类,源码中通过操作该类,可以把消息字节写入PageCache缓存区(commit),或者原子性地将消息持久化的刷盘(flush);

消息刷盘

消息类别

我们可使用Hash取模法,让同一个订单发送到同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息。这样,我们保证了发送有序。

RocketMQ的topic内的队列机制,可以保证存储满足FIFO(First Input First Output 简单说就是指先进先出),剩下的只需要消费者顺序消费即可。

RocketMQ仅保证顺序发送,顺序消费由消费者业务保证!!!

这里很好理解,一个订单你发送的时候放到一个队列里面去,你同一个的订单号Hash一下是不是还是一样的结果,那肯定是一个消费者消费,那顺序是不是就保证了?

但要实现严格顺序消息,简单且可行的办法就是:
保证生产者 - MQServer - 消费者是一对一对一的关系

// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如 2016-03-07 16:21:00 投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。    
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();    
msg.setStartDeliverTime(timeStamp);​    
// 发送消息,只要不抛异常就是成功    
SendResult sendResult = producer.send(msg); 
Message sendMsg = new Message(topic, tags, message.getBytes());
sendMsg.setDelayTimeLevel(delayLevel);
// 默认3秒超时
SendResult sendResult = rocketMQProducer.send(sendMsg);

消息发送

消息消费

但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

消费端的Push模式是通过长轮询的模式来实现的,就如同下图:

Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。

当然,Consumer端是通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest发送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest时,如果发现没有消息,就会把PullRequest扔到ConcurrentHashMap中缓存起来。

broker在启动时,会启动一个线程不停的从ConcurrentHashMap取出PullRequest检查,直到有数据返回。

如果Consumer的数量比Message Queue的总数量还多的话,多出来的Consumer将无法分到Queue,实际上只是起到backup的作用。

消费模式

当使用广播消费模式时,消息队列 RocketMQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。在广播消费中的Consumer Group概念可以认为在消息划分方面无意义。

负载均衡

首先分析一下RocketMQ的客户端发送消息的源码:

在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:

  1. 如果没有指定namesrv地址,将会自动寻址
  1. 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳...
  1. 启动负载均衡的服务

初始化完成后,开始发送消息,发送消息的主要代码如下:

代码中需要关注的两个方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,如果没有获取到,则会自己去namesrv获取路由信息。selectOneMessageQueue方法通过轮询的方式,返回一个队列,以达到负载均衡的目的。

如果Producer发送消息失败,会自动重试,重试的策略:

  1. 重试次数 < retryTimesWhenSendFailed(可配置)
  2. 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
  3. 同时满足上面两个条件后,Producer会选择另外一个队列发送消息

Producer向一些队列轮流发送消息,队列集合称为TopicConsumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列;如果做集群消费,则多个Consumer实例平均消费这个Topic对应的队列集合。

上图的集群模式里,每个consumer消费部分消息,这里的负载均衡是怎样的呢?

消费端会通过RebalanceService线程,20秒钟做一次基于topic下的所有队列负载:

  1. 遍历Consumer下的所有topic,然后根据topic订阅所有的消息
  2. 获取同一topic和Consumer Group下的所有Consumer
  3. 然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等

如同上图所示:如果有 3 个队列,2 个 consumer,那么第一个 Consumer 消费 2 个队列,第二 consumer 消费 1 个队列。这里采用的就是平均分配策略,它类似于我们的分页,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页有多少条记录,就类似于某个Consumer会消费哪些队列。

通过这样的策略来达到大体上的平均消费,这样的设计也可以很方面的水平扩展Consumer来提高消费能力。

消息重试

消费失败就会重投递:
1、返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
2、返回 null
3、抛出异常(如果是被捕获的异常,则不会进行消息重试。)

RocketMQ 并不会无限制地重试下去,默认每条消息最多重试 16 次。
如果消息重试 16 次之后还是消费失败怎么办呢?那么消息就不会再投递给消费者,而是将消息放到相对应的死信队列中。这时候我们就需要对死信队列的消息做一些人工补偿处理,因为这些消息可能本身就有问题,也有可能和消费逻辑调用的服务有关等,所以需要人工判断之后再进行处理。

上一篇下一篇

猜你喜欢

热点阅读