RocketMQ系列

RocketMQ系列(三):producer

2020-06-06  本文已影响0人  范柏柏

rocketMq消息体

消息体.png

properties扩展中存了什么呢

producer启动过程

启动过程就是新建一个MQClientInstance实例。整个JVM实例中只存在一个MQClientInstance实例。

clientId为 本机ip + JVM线程id。
MQClientInstance作用是:与nameServer交互。网络请求,心跳检测。

发送消息过程

1、获取路由信息

路由信息其实就是消息队列列表
topic路由信息会缓存在producer中,以一个list变量的形式存在在内存中。
如果本地没有topic路由信息,向nameServer发送请求,获取路由信息,更新本地路由表。

所以问题了,本地有了,这个路由表什么时候更新呢?
起一个定时任务,每隔30s从nameServer获取topic路由表,更新本地路由。
producer会跟topic涉及的所有broker建立长连接,没隔30秒发送一个心跳。
broker端也会每10秒扫描一次注册的producer,如果2分钟没有心跳,则断开连接。

2、按照负载均衡策略,选择路由

2.1 默认投递方式:轮训

2.2 默认投递方式增强:轮训算法和延迟最小策略

默认的投递方式比较简单,但是也暴露了一个问题,就是有些queue可能由于自身数量积压等原因,可能投递的过程比较长,就尽量不要选择这样的queue了。rocketMq在每次发送一个MQ消息后,都会统计一下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些queue投递的速度快。在这种场景下,会优先使用消息投递最小延迟策略。

2.3 顺序消息的投递方式

如果不用默认方式,可以自己选择MessageQueueSelector。
recketMq也提供了集中选择器实现,当然也可以自己实现。

生产者在消息投递的过程中,使用了 MessageQueueSelector 作为队列选择的策略接口,其定义如下:

package org.apache.rocketmq.client.producer;

import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

public interface MessageQueueSelector {
        /**
         * 根据消息体和参数,从一批消息队列中挑选出一个合适的消息队列
         * @param mqs  待选择的MQ队列选择列表
         * @param msg  待发送的消息体
         * @param arg  附加参数
         * @return  选择后的队列
         */
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
投递策略 策略实现类 说明
随机分配策略 SelectMessageQueueByRandom 使用了简单的随机数选择算法
基于Hash分配策略 SelectMessageQueueByHash 根据附加参数的Hash值,按照消息队列列表的大小取余数,得到消息队列的index
基于机器机房位置分配策略 SelectMessageQueueByMachineRoom 开源的版本没有具体的实现,基本的目的应该是机器的就近原则分配

hash代码实现

public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}

3、根据选择出的路由,发送消息到broker

发送消息的三种方式

怎么控制是使用哪种方式??
在producer调用send函数的时候,有不同的send函数。

如何保证消息一定发送成功

rocketMq有重试机制。调用api的时候,会返回成功。如果返回不成功,则进行下一次投递,往下一个queue投。直到server端返回了成功。
如果在设置的重置次数用完了,还没成功。那就是真失败了。这个时候用户端就会感知了,会抛异常给用户端。

上一篇下一篇

猜你喜欢

热点阅读