腾讯云TDMQ(RocketMQ)版介绍及入门
一、入门介绍
1.1 TDMQ简介
TDMQ总共有4个版本,分别是RocketMQ版、Pulsar版、RabbitMQ版、CMQ版;其中Pulsar是一款基于 Apache 顶级开源项目 Pulsar 自研的金融级分布式消息中间件,具备跨城高一致、高可靠、高并发的特性,RabbitMQ是Erlang编写的,CMQ是腾讯自研的,RocketMQ也是腾讯基于开源项目自研的;这里面除了Pulsar之外,其它目前都是公测阶段,不收费,但是也意味着可能不稳定。
由于我们公司主要使用的就是开源RocketMQ,而且中台项目都是基于Spring Cloud Stream的,所以要无缝低成本切换的话,只有RocketMQ版本比较适合,本文就此展开介绍。
TDMQ for RocketMQ是一款腾讯自主研发的消息队列服务,兼容 Apache RocketMQ 的各个组件与概念,支持 RocketMQ 4.6.1及以上版本的客户端零改造接入,同时具备计算存储分离、灵活扩缩容的底层优势。能较好地应对各类营销活动带来的流量冲击,非常适用于顺序性以及事务性要求较高的场景,在电商交易、金融结算等领域有着十分广泛的应用。
1.2 产品优势
- 兼容开源4.3.0及以上版本,对于 4.6.1及以上版本的客户端支持零改造接入;
- 服务(Broker)和存储(Bookie)分离,整体架构采用云原生无状态设计,用户可以按量使用和按需扩展,体验更加 Serverless 化,用户对底层资源无感知。
- 多层级的资源结构,不仅基于命名空间做了虚拟隔离,也可以在集群维度做物理隔离。支持在命名空间维度为客户端配置权限校验,区分不同环境的客户端,方便灵活。
- 对于消息数据采用分片的方式进行持久化,不容易产生数据倾斜等问题。当由于扩容、机器故障等导致的节点新增、删除时,不会触发重平衡而导致整个集群的吞吐急剧下降。
- 支持普通消息、顺序消息、延时消息等多种消息类型,支持消息重试和死信机制,满足各类业务场景。
- 单机最高可支持上万级别的生产消费吞吐,分布式架构,无状态服务,可以横向扩容来增强整个集群的吞吐。
1.3 应用场景
- 异步解耦,实现高效的异步通信和应用解耦,确保主站业务的连续性。
- 削峰填谷,承担一个缓冲器的角色,将上游突增的请求集中收集,下游可以根据自己的实际处理能力来消费请求消息。
- 顺序收发,提供一种专门应对需要严格按照顺序执行消息的功能,即保证消息 FIFO。
- 分布式事务一致性,自动重推和海量堆积能力来实现事务补偿,实现最终一致性。
- 分布式缓存同步,广播消费模式会被所有节点消费一次,相当于把信息同步到需要的每台机器上,可以取代缓存的作用。
- 大数据分析, 与流式计算引擎相结合,可以很方便地实现业务数据的实时分析。
1.4 使用限制
- 同一地域内集群数量上限5个;
- 同一集群内命名空间数量上限10个;
- 单个命名空间TPS上限8000,生产和消费带宽上限400Mbps;
- 单个命名空间内Topic上限1000;
- 单个Topic生产者数量上限1000个,消费者数量上限500个;
- 单个命名空间内Group上限10000个;
- 消息最大保留时长15天;
- 消息最大延迟时间40天;
- 消息大小上限5MB;
- 消费位点重置最长15天;
1.5 收费标准
2022年5月之前处于公测阶段,申请资格成功后,可以免费使用。
1.6 资源介绍
- 集群,用户手动创建一个可伸缩、免运维、零配置的RocketMQ集群,接入地址目前只支持内网接入,公网接入需要等到公测结束后才支持;通常建议不同环境创建不同的集群,达到物理隔离的目的;
![](https://img.haomeiwen.com/i5673257/12e4d6c01edcf0ef.png)
- 命名空间,在集群内部可以创建不同的命名空间,来逻辑隔离不同的消息,通常建议不同的项目群使用不同的命名空间;允许设置TTL时长、消息持久化策略、时长、空间等;
![](https://img.haomeiwen.com/i5673257/612c815d953c17eb.png)
- 角色与授权,在角色管理里面先创建需要的角色,然后在命名空间里面给角色赋权生产或者消费的权限;
![](https://img.haomeiwen.com/i5673257/c9aa3936b2b7f6b0.png)
![](https://img.haomeiwen.com/i5673257/2afc8edb64ec5d0f.png)
- Topic,消息的Topic需要手动创建,允许设置Topic中消息的类型、分区数量;
![](https://img.haomeiwen.com/i5673257/aa6494096f6f789f.png)
- Group,消费者组也需要手动创建,可以设置消费组消费开关和广播开关;
![](https://img.haomeiwen.com/i5673257/a607a0d45efddeb4.png)
1.7 消息与轨迹查询
当一条消息从生产者发送到 TDMQ RocketMQ 版服务端,再由消费者进行消费,TDMQ RocketMQ 版会完整记录这条消息中间的流转过程,并以消息轨迹的形式呈现在控制台。消息轨迹记录了消息从生产端到 TDMQ RocketMQ 版服务端,最后到消费端的整个过程,包括各阶段的时间(精确到微秒)、执行结果、生产者 IP、消费者 IP 等。
![](https://img.haomeiwen.com/i5673257/ab43be3104f46253.jpg)
使用限制如下:
- 消息查询最多可以查询近3天的消息;
- 一次性最多可以查询65536条消息;
1.8 配置告警
- 指标
- 消息生产速率
- 消息生产流量
- 生产者数据量
- 消息堆积数量
- 告警频次
- 分钟级别
- 小时级别
- 天级别
- 通知模板
- 接收对象可以是用户或者用户组;
- 接收渠道可以是邮件、短信、微信、电话;
1.9 TDMQ与开源RocketMQ对比
![](https://img.haomeiwen.com/i5673257/85cc0145ac5c0df8.png)
二、开发指南
2.1 Java SDK
首先需要导入RocketMQ的依赖:
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.6.1</version>
</dependency>
然后,我们创建一个生产者:
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(
namespace,
groupName,
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限
);
// 设置NameServer的地址
producer.setNamesrvAddr(nameserver);
// 启动Producer实例
producer.start();
然后发送消息分为如下几种方式:
-
同步发送
同一个线程同步阻塞式地进行消息的发送,比较适合需要根据上一次发送结果来做逻辑判断,以决定下一次是否发送的场景。
for (int i = 0; i < 10; i++) { // 创建消息实例,设置topic和消息内容 Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); }
-
异步发送
不同的线程同时并发地进行消息的发送,比较适合各个消息之间无依赖的场景,发送效率得到了提升。
// 设置发送失败后不重试 producer.setRetryTimesWhenSendAsyncFailed(0); // 设置发送消息的数量 int messageCount = 10; final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; // 创建消息实体,设置topic和消息内容 Message msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 消息发送成功逻辑 countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { // 消息发送失败逻辑 countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS);
-
单向发送
同一个线程同步阻塞式地进行消息的发送,但是发送者不用等待broker的消息确认返回,即开始下一个消息的发送,比较快速,适合于对消息少量丢失不敏感的场景。
for (int i = 0; i < 10; i++) { // 创建消息实例,设置topic和消息内容 Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送单向消息 producer.sendOneway(msg); }
然后再设置消费者,分为如下两种:
-
push
// 实例化消费者 DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer( namespace, groupName, new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限 // 设置NameServer的地址 pushConsumer.setNamesrvAddr(nameserver); // 订阅topic pushConsumer.subscribe(topic_name, "*"); // 设置消费模式:CLUSTERING-集群负载消费(默认);BROADCASTING-广播消费 //consumer.setMessageModel(MessageModel.CLUSTERING); // 注册回调实现类来处理从broker拉取回来的消息 pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { // 消息处理逻辑 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 标记该消息已经被成功消费, 根据消费情况,返回处理状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 启动消费者实例 pushConsumer.start();
-
pull
// 实例化消费者 DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer( namespace, groupName, new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); // 设置NameServer的地址 pullConsumer.setNamesrvAddr(nameserver); // 设置从第一个偏移量开始消费 pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 订阅topic pullConsumer.subscribe(topic_name, "*"); // 设置消费模式:CLUSTERING-集群负载消费(默认);BROADCASTING-广播消费 //consumer.setMessageModel(MessageModel.CLUSTERING); // 启动消费者实例 pullConsumer.start(); try { System.out.printf("Consumer Started.%n"); while (true) { // 拉取消息 List<MessageExt> messageExts = pullConsumer.poll(); System.out.printf("%s%n", messageExts); } } finally { pullConsumer.shutdown(); }
2.2 Spring Boot Starter
首先引入依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
然后增加配置信息如下:
server:
port: 8082
#rocketmq配置信息
rocketmq:
# tdmq-rocketmq服务接入地址
name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
# 生产者配置
producer:
# 生产者组名
group: group111
# 角色密钥
access-key: eyJrZXlJZC....
# 已授权的角色名称
secret-key: admin
# 消费者公共配置
consumer:
# 角色密钥
access-key: eyJrZXlJZC....
# 已授权的角色名称
secret-key: admin
# 用户自定义配置
namespace: rocketmq-xxx|namespace1
producer1:
topic: testdev1
consumer1:
group: group111
topic: testdev1
subExpression: TAG1
consumer2:
group: group222
topic: testdev1
subExpression: TAG2
然后如下是一个发送消息的示例:
/**
* Description: 消息生产者
*/
@Service
public class SendMessage {
// 需要使用topic全称,所以进行topic名称的拼接,也可以自己设置 格式:namespace全称%topic名称
@Value("${rocketmq.namespace}%${rocketmq.producer1.topic}")
private String topic;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 同步发送
*
* @param message 消息内容
* @param tags 订阅tags
*/
public void syncSend(String message, String tags) {
// springboot不支持使用header传递tags,根据要求,需要在topic后进行拼接 formats: `topicName:tags`,不拼接标识无tag
String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult = rocketMQTemplate.syncSend(destination,
MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_KEYS, "yourKey") // 指定业务key
.build());
System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult);
}
}
如下是一个消费者示例:
@Service
@RocketMQMessageListener(
// 消费组,格式:namespace全称%group名称
consumerGroup = "${rocketmq.namespace}%${rocketmq.consumer1.group}",
// 需要使用topic全称,所以进行topic名称的拼接,也可以自己设置 格式:namespace全称%topic名称
topic = "${rocketmq.namespace}%${rocketmq.consumer1.topic}",
selectorExpression = "${rocketmq.consumer1.subExpression}" // 订阅表达式, 不配置表示订阅所有消息
)
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Tag1Consumer receive message:" + message);
}
}
2.3 Spring Cloud Stream
如果不熟悉Stream使用的可以先参考文章Stream使用入门 - 简书 (jianshu.com)
首先引入依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.7.1</version>
</dependency>
<!--spring-cloud-starter-stream-rocketmq 里面的 RocketMQ 版本较老,需要排除掉,然后单独引用新的版本-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2.2.5-RocketMQ-RC1</version>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
</exclusions>
</dependency>
然后是stream的配置:
spring:
cloud:
stream:
rocketmq:
bindings:
# channel名称, 与spring.cloud.stream.bindings下的channel名称对应
Topic-test1:
consumer:
# 订阅的tag类型,根据消费者实际情况进行配置(默认是订阅所有消息)
subscription: TAG1
# channel名称
Topic-test2:
consumer:
subscription: TAG2
binder:
# 服务地址全称
name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
# 角色名称
secret-key: admin
# 角色密钥
access-key: eyJrZXlJZ...
# namespace全称
namespace: rocketmq-xxx|namespace1
# 生成者group名称
group: group1
bindings:
# channel名称
Topic-send:
# 指定topic, 对应创建的topic名称
destination: topic1
content-type: application/json
# 要使用group名称
group: group1
# channel名称
Topic-test1:
destination: topic1
content-type: application/json
group: group1
# channel名称
Topic-test2:
destination: topic1
content-type: application/json
group: group2
如下是channel的示例:
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* 自定义通道 Binder
*/
public interface CustomChannelBinder {
/**
* 发送消息(消息生产者)
* 绑定配置中的channel名称
*/
@Output("Topic-send")
MessageChannel sendChannel();
/**
* 接收消息1(消费者1)
* 绑定配置中的channel名称
*/
@Input("Topic-test1")
MessageChannel testInputChannel1();
/**
* 接收消息2(消费者2)
* 绑定配置中的channel名称
*/
@Input("Topic-test2")
MessageChannel testInputChannel2();
}
在配置类或启动类上添加相应注解,如果有多个binder配置,都要在此注解中进行指定。
@EnableBinding({CustomChannelBinder.class})
发送消息示例:
@Autowired
private CustomChannelBinder channelBinder;
Message<String> message = MessageBuilder.withPayload("This is a new message.").build();
channelBinder.sendChannel().send(message);
接收消息示例:
@Service
public class TestStreamConsumer {
private final Logger logger = LoggerFactory.getLogger(DemoApplication.class);
/**
* 监听channel (配置中的channel名称)
*
* @param messageBody 消息内容
*/
@StreamListener("Topic-test1")
public void receive(String messageBody) {
logger.info("Receive1: 通过stream收到消息,messageBody = {}", messageBody);
}
/**
* 监听channel (配置中的channel名称)
*
* @param messageBody 消息内容
*/
@StreamListener("Topic-test2")
public void receive2(String messageBody) {
logger.info("Receive2: 通过stream收到消息,messageBody = {}", messageBody);
}
}