rocketmq集群消费以及广播消费
2022-09-01 本文已影响0人
小小的小帅
pom依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.0</version>
<scope>compile</scope>
</dependency>
生产者:
package com.hscs.cux;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* Created by hand on 2018/5/2.
*/
public class Producer {
public static void main(String[] args) throws Exception {
// 创建DefaultMQProducer类并设定生产者名称HSCS_DEV_TST_MSG
DefaultMQProducer mqProducer = new DefaultMQProducer("hscs_producer-group-test");
// 设置NameServer地址,如果是集群的话,使用分号;分隔开
mqProducer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
// 消息最大长度 默认4M
mqProducer.setMaxMessageSize(4096);
// 发送消息超时时间,默认3000
mqProducer.setSendMsgTimeout(3000);
// 发送消息失败重试次数,默认2
mqProducer.setRetryTimesWhenSendAsyncFailed(2);
// 启动消息生产者
mqProducer.start();
// 循环十次,发送十条消息
for (int i = 1; i <= 10; i++) {
String msg = "hello, 这是第" + i + "条同步消息";
// 创建消息,并指定Topic(主题),Tag(标签)和消息内容
Message message = new Message("HSCS_DEV_TST_MSG", "", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送同步消息到一个Broker,可以通过sendResult返回消息是否成功送达
SendResult sendResult = mqProducer.send(message);
System.out.println(sendResult);
}
// 如果不再发送消息,关闭Producer实例
mqProducer.shutdown();
}
}
消费者:
package com.hscs.cux;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.nio.charset.StandardCharsets;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
// 创建DefaultMQPushConsumer类并设定消费者名称
DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("hscs_consumer-group-test");
// 设置NameServer地址,如果是集群的话,使用分号;分隔开
mqPushConsumer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
// 如果不是第一次启动,那么按照上次消费的位置继续消费
mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置消费模型,集群还是广播,默认为集群
//mqPushConsumer.setMessageModel(MessageModel.CLUSTERING);
mqPushConsumer.setMessageModel(MessageModel.BROADCASTING);
// 消费者最小线程量
mqPushConsumer.setConsumeThreadMin(5);
// 消费者最大线程量
mqPushConsumer.setConsumeThreadMax(10);
// 设置一次消费消息的条数,默认是1
mqPushConsumer.setConsumeMessageBatchMaxSize(1);
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*
mqPushConsumer.subscribe("HSCS_DEV_TST_MSG", "*");
// 注册回调实现类来处理从broker拉取回来的消息
mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
// 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage方法接收数据
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = msgList.get(0);
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
System.out.println("消费者接收到消息: " + messageExt.toString() + "---消息内容为:" + body);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
mqPushConsumer.start();
}
}
spring集成:
application.yml修改:
rocketmq:
# rocketmq地址
name-server: 192.168.101.11:9876
producer:
# 必须填写 group
group: test-group
生产端:
package com.anker.scs.ankerinterface.infra.message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 普通消息投递
*/
@GetMapping("/test")
public String test(String msg) {
rocketMQTemplate.convertAndSend("add-bonus", "testMessaging");
return "投递消息 => " + msg + " => 成功";
}
}
消费端:
package com.anker.scs.ankerinterface.infra.message;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Component
@RocketMQMessageListener(topic = "MDP_TOPIC_COMPANY_INFO_SIT",
consumerGroup = "TMS_GROUP_AMDP_COMPANY",
consumeThreadMax = 8, consumeMode = ConsumeMode.ORDERLY)
public class AmdpCompanyListener implements RocketMQListener<JSONObject>, RocketMQPushConsumerLifecycleListener {
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
}
@Override
@Transactional(rollbackFor = Exception.class)
public void onMessage(JSONObject jsonObject) {
log.info("MDP_TOPIC_COMPANY_INFO_SIT:{}", jsonObject);
//List<CompanyDTO> companyDTOList = JSON.parseArray(jsonObject.getString("data"), CompanyDTO.class);
}
}