腾讯云TDMQ(RocketMQ)版介绍及入门

2022-01-05  本文已影响0人  文景大大

一、入门介绍

1.1 TDMQ简介

TDMQ总共有4个版本,分别是RocketMQ版、Pulsar版、RabbitMQ版、CMQ版;其中Pulsar是一款基于 Apache 顶级开源项目 Pulsar 自研的金融级分布式消息中间件,具备跨城高一致、高可靠、高并发的特性,RabbitMQ是Erlang编写的,CMQ是腾讯自研的,RocketMQ也是腾讯基于开源项目自研的;这里面除了Pulsar之外,其它目前都是公测阶段,不收费,但是也意味着可能不稳定。

由于我们公司主要使用的就是开源RocketMQ,而且中台项目都是基于Spring Cloud Stream的,所以要无缝低成本切换的话,只有RocketMQ版本比较适合,本文就此展开介绍。

TDMQ for RocketMQ是一款腾讯自主研发的消息队列服务,兼容 Apache RocketMQ 的各个组件与概念,支持 RocketMQ 4.6.1及以上版本的客户端零改造接入,同时具备计算存储分离灵活扩缩容的底层优势。能较好地应对各类营销活动带来的流量冲击,非常适用于顺序性以及事务性要求较高的场景,在电商交易、金融结算等领域有着十分广泛的应用。

1.2 产品优势

1.3 应用场景

1.4 使用限制

1.5 收费标准

2022年5月之前处于公测阶段,申请资格成功后,可以免费使用。

1.6 资源介绍

集群创建 命名空间创建 角色创建 角色赋权 Topic创建 Group创建

1.7 消息与轨迹查询

当一条消息从生产者发送到 TDMQ RocketMQ 版服务端,再由消费者进行消费,TDMQ RocketMQ 版会完整记录这条消息中间的流转过程,并以消息轨迹的形式呈现在控制台。消息轨迹记录了消息从生产端到 TDMQ RocketMQ 版服务端,最后到消费端的整个过程,包括各阶段的时间(精确到微秒)、执行结果、生产者 IP、消费者 IP 等。

消息轨迹查询示意图

使用限制如下:

1.8 配置告警

1.9 TDMQ与开源RocketMQ对比

对比图

二、开发指南

2.1 Java SDK

首先需要导入RocketMQ的依赖:

<!-- in your <dependencies> block -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.6.1</version>
</dependency>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.6.1</version>
</dependency>

然后,我们创建一个生产者:

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(
    namespace, 
    groupName,
    new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限
);
// 设置NameServer的地址
producer.setNamesrvAddr(nameserver);
// 启动Producer实例
producer.start();

然后发送消息分为如下几种方式:

然后再设置消费者,分为如下两种:

2.2 Spring Boot Starter

首先引入依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

然后增加配置信息如下:

server:
  port: 8082

#rocketmq配置信息
rocketmq:
  # tdmq-rocketmq服务接入地址
  name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
  # 生产者配置
  producer:
    # 生产者组名
    group: group111
    # 角色密钥
    access-key: eyJrZXlJZC....
    # 已授权的角色名称
    secret-key: admin
  # 消费者公共配置
  consumer:
    # 角色密钥
    access-key: eyJrZXlJZC....
    # 已授权的角色名称
    secret-key: admin

  # 用户自定义配置
  namespace: rocketmq-xxx|namespace1
  producer1:
    topic: testdev1
  consumer1:
    group: group111
    topic: testdev1
    subExpression: TAG1
  consumer2:
    group: group222
    topic: testdev1
    subExpression: TAG2

然后如下是一个发送消息的示例:

/**
 * Description: 消息生产者
 */
@Service
public class SendMessage {
    // 需要使用topic全称,所以进行topic名称的拼接,也可以自己设置  格式:namespace全称%topic名称
    @Value("${rocketmq.namespace}%${rocketmq.producer1.topic}")
    private String topic;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 同步发送
     *
     * @param message 消息内容
     * @param tags    订阅tags
     */
    public void syncSend(String message, String tags) {
        // springboot不支持使用header传递tags,根据要求,需要在topic后进行拼接 formats: `topicName:tags`,不拼接标识无tag
        String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
        SendResult sendResult = rocketMQTemplate.syncSend(destination,
                MessageBuilder.withPayload(message)
                        .setHeader(MessageConst.PROPERTY_KEYS, "yourKey")   // 指定业务key
                        .build());
        System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult);
    }
}

如下是一个消费者示例:

@Service
@RocketMQMessageListener(
        // 消费组,格式:namespace全称%group名称
        consumerGroup = "${rocketmq.namespace}%${rocketmq.consumer1.group}",  
        // 需要使用topic全称,所以进行topic名称的拼接,也可以自己设置  格式:namespace全称%topic名称
        topic = "${rocketmq.namespace}%${rocketmq.consumer1.topic}",
        selectorExpression = "${rocketmq.consumer1.subExpression}" // 订阅表达式, 不配置表示订阅所有消息
)
public class MessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        System.out.println("Tag1Consumer receive message:" + message);
    }
}

2.3 Spring Cloud Stream

如果不熟悉Stream使用的可以先参考文章Stream使用入门 - 简书 (jianshu.com)

首先引入依赖:

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.7.1</version>
</dependency>
<dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-acl</artifactId>
     <version>4.7.1</version>
</dependency>

<!--spring-cloud-starter-stream-rocketmq 里面的 RocketMQ 版本较老,需要排除掉,然后单独引用新的版本-->
<dependency>
     <groupId>com.alibaba.cloud</groupId>
     <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
     <version>2.2.5-RocketMQ-RC1</version>
     <exclusions>
             <exclusion>
                     <groupId>org.apache.rocketmq</groupId>
                     <artifactId>rocketmq-client</artifactId>
             </exclusion>
             <exclusion>
                     <groupId>org.apache.rocketmq</groupId>
                     <artifactId>rocketmq-acl</artifactId>
             </exclusion>
     </exclusions>
</dependency>

然后是stream的配置:

spring:
 cloud:
     stream:
         rocketmq:
             bindings:
                 # channel名称, 与spring.cloud.stream.bindings下的channel名称对应
                 Topic-test1:
                     consumer:
                         # 订阅的tag类型,根据消费者实际情况进行配置(默认是订阅所有消息)
                         subscription: TAG1
                 # channel名称
                 Topic-test2:
                     consumer:
                         subscription: TAG2
             binder:
                 # 服务地址全称
                 name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
                 # 角色名称
                 secret-key: admin
                 # 角色密钥
                 access-key: eyJrZXlJZ...
                 # namespace全称
                 namespace: rocketmq-xxx|namespace1
                 # 生成者group名称
                 group: group1
         bindings:
             # channel名称
             Topic-send:
                 # 指定topic, 对应创建的topic名称
                 destination: topic1
                 content-type: application/json
                 # 要使用group名称
                 group: group1
             # channel名称
             Topic-test1:
                 destination: topic1
                 content-type: application/json
                 group: group1
             # channel名称
             Topic-test2:
                 destination: topic1
                 content-type: application/json
                 group: group2

如下是channel的示例:

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
* 自定义通道 Binder
*/
public interface CustomChannelBinder {

     /**
        * 发送消息(消息生产者)
        * 绑定配置中的channel名称
        */
     @Output("Topic-send")
     MessageChannel sendChannel();


     /**
        * 接收消息1(消费者1)
        * 绑定配置中的channel名称
        */
     @Input("Topic-test1")
     MessageChannel testInputChannel1();

     /**
        * 接收消息2(消费者2)
        * 绑定配置中的channel名称
        */
     @Input("Topic-test2")
     MessageChannel testInputChannel2();
}

在配置类或启动类上添加相应注解,如果有多个binder配置,都要在此注解中进行指定。

@EnableBinding({CustomChannelBinder.class})

发送消息示例:

@Autowired
private CustomChannelBinder channelBinder;

Message<String> message = MessageBuilder.withPayload("This is a new message.").build();
channelBinder.sendChannel().send(message);

接收消息示例:

@Service
public class TestStreamConsumer {
     private final Logger logger = LoggerFactory.getLogger(DemoApplication.class);
         /**
        * 监听channel (配置中的channel名称)
        *
        * @param messageBody 消息内容
        */
     @StreamListener("Topic-test1")
     public void receive(String messageBody) {
             logger.info("Receive1: 通过stream收到消息,messageBody = {}", messageBody);
     }
         /**
        * 监听channel (配置中的channel名称)
        *
        * @param messageBody 消息内容
        */
     @StreamListener("Topic-test2")
     public void receive2(String messageBody) {
             logger.info("Receive2: 通过stream收到消息,messageBody = {}", messageBody);
     }
}

三、参考文档

消息队列 RocketMQ 版 产品概述 - 产品简介 - 文档中心 - 腾讯云 (tencent.com)

RocketMQ开发者使用指南入门 - 简书 (jianshu.com)

上一篇 下一篇

猜你喜欢

热点阅读