RocketMQ系列(三):producer
rocketMq消息体
消息体.pngproperties扩展中存了什么呢
-
tag: 消息tag,用于消息过滤
-
keys:message索引键,多个空格隔开,rocketMq可以根据这些key快速检索到消息。具体怎么快速检索,在消息存储章节细聊。
-
waitStoreMsgOk: 消息发送时是否等消息存储完成后再返回
-
delayTimeLevel: 消息延迟级别,用于定时消息和消息重试
producer启动过程
启动过程就是新建一个MQClientInstance实例。整个JVM实例中只存在一个MQClientInstance实例。
clientId为 本机ip + JVM线程id。
MQClientInstance作用是:与nameServer交互。网络请求,心跳检测。
发送消息过程
1、获取路由信息
路由信息其实就是消息队列列表
topic路由信息会缓存在producer中,以一个list变量的形式存在在内存中。
如果本地没有topic路由信息,向nameServer发送请求,获取路由信息,更新本地路由表。
所以问题了,本地有了,这个路由表什么时候更新呢?
起一个定时任务,每隔30s从nameServer获取topic路由表,更新本地路由。
producer会跟topic涉及的所有broker建立长连接,没隔30秒发送一个心跳。
broker端也会每10秒扫描一次注册的producer,如果2分钟没有心跳,则断开连接。
2、按照负载均衡策略,选择路由
2.1 默认投递方式:轮训
- 对queue list进行排序
- 获取一个全局自增的计数变量。获取一次自增一次。
- 用这个变量对队列size取模
- 模到了几,就选择哪个队列。因为变量是自增的,所以模的值也是根据队列size自增的,也就是轮训的。
- 当自增值增加到int最大值后,该值重置为0
2.2 默认投递方式增强:轮训算法和延迟最小策略
默认的投递方式比较简单,但是也暴露了一个问题,就是有些queue可能由于自身数量积压等原因,可能投递的过程比较长,就尽量不要选择这样的queue了。rocketMq在每次发送一个MQ消息后,都会统计一下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些queue投递的速度快。在这种场景下,会优先使用消息投递最小延迟策略。
2.3 顺序消息的投递方式
如果不用默认方式,可以自己选择MessageQueueSelector。
recketMq也提供了集中选择器实现,当然也可以自己实现。
生产者在消息投递的过程中,使用了 MessageQueueSelector 作为队列选择的策略接口,其定义如下:
package org.apache.rocketmq.client.producer;
import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
public interface MessageQueueSelector {
/**
* 根据消息体和参数,从一批消息队列中挑选出一个合适的消息队列
* @param mqs 待选择的MQ队列选择列表
* @param msg 待发送的消息体
* @param arg 附加参数
* @return 选择后的队列
*/
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
投递策略 | 策略实现类 | 说明 |
---|---|---|
随机分配策略 | SelectMessageQueueByRandom | 使用了简单的随机数选择算法 |
基于Hash分配策略 | SelectMessageQueueByHash | 根据附加参数的Hash值,按照消息队列列表的大小取余数,得到消息队列的index |
基于机器机房位置分配策略 | SelectMessageQueueByMachineRoom | 开源的版本没有具体的实现,基本的目的应该是机器的就近原则分配 |
hash代码实现
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
3、根据选择出的路由,发送消息到broker
发送消息的三种方式
-
可靠同步发送
发送者执行发送消息api时,同步等待,直到消息服务器返回发送结果。 -
可靠异步发送
发送者执行发送消息api时,指定消息发送成功后的回调函数,然后立即返回,线程不阻塞,消息发送成功或失败,在回调函数(一个新的线程)中执行。 -
单向发送
发送者执行发送消息api时,直接返回,也没有回调函数。简单的说,就是只管发,不在乎消息是否成功存储在消息服务器上。
怎么控制是使用哪种方式??
在producer调用send函数的时候,有不同的send函数。
如何保证消息一定发送成功
rocketMq有重试机制。调用api的时候,会返回成功。如果返回不成功,则进行下一次投递,往下一个queue投。直到server端返回了成功。
如果在设置的重置次数用完了,还没成功。那就是真失败了。这个时候用户端就会感知了,会抛异常给用户端。