RocketMq订阅与发送消息
2022-06-08 本文已影响0人
CodeFarmerYang
使用阿里云的rocketMq订阅与发送消息,直接上代码:
1.引入pom依赖
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.8.Final</version>
</dependency>
2.新建RocketMqUtil类
package com.**.***.***.utils;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.***.***.***.listener.MqMessageListener;
import com.***.***.***.listener.MqTimeMessageListener;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RocketMqUtil {
@Value("${rocketMq.groupId}")
private String groupId;
@Value("${rocketMq.accessKey}")
private String accessKey;
@Value("${rocketMq.secretKey}")
private String secretKey;
@Value("${rocketMq.nameSrvAddr}")
private String nameSrvAddr;
@Value("${rocketMq.topic}")
private String topic;
@Value("${rocketMq.startApprovalTag}")
private String startApprovalTag;
@Value("${rocketMq.timeout}")
private String timeout;
@Autowired
private MqMessageListener messageListener;
@Autowired
private MqTimeMessageListener timeMessageListener;
/**
* 创建消息生产者
* @return
*/
@Bean(initMethod = "start", destroyMethod = "shutdown")
public Producer producer() {
Properties properties = new Properties();
properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
properties.put(PropertyKeyConst.GROUP_ID, groupId);
properties.put(PropertyKeyConst.SendMsgTimeoutMillis, timeout);
ProducerBean producer = new ProducerBean();
producer.setProperties(properties);
log.info("rocketMq创建生产者成功");
return producer;
}
/**
* 创建消息订阅
* @return
*/
@Bean(initMethod = "start")
public ConsumerBean consumer() {
ConsumerBean consumerBean = new ConsumerBean();
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
consumerBean.setProperties(properties);
// 订阅消息
Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
// 订阅普通消息
Subscription subscription = new Subscription();
subscription.setTopic(topic);
subscription.setExpression("start_approve||process_approve");
subscriptionTable.put(subscription, messageListener);
// 订阅定时/延时消息
Subscription subscriptionTime = new Subscription();
subscriptionTime.setTopic(topic);
subscriptionTime.setExpression("start_approve||process_approve");
subscriptionTable.put(subscriptionTime, timeMessageListener);
consumerBean.setSubscriptionTable(subscriptionTable);
log.info("rocketMq订阅成功");
return consumerBean;
}
}
3.普通消息监听类MqMessageListener
package com.***.***.***.listener;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@Component
public class MqMessageListener implements MessageListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public Action consume(Message message, ConsumeContext context) {
logger.info("接收到MQ普通消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(), message.getTag(), message.getMsgID(),message.getKey(), new String(message.getBody()));
try {
// 处理业务
return Action.CommitMessage;
} catch (Exception e) {
logger.error("消费MQ消息失败! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
return Action.ReconsumeLater;
}
}
}
4.异步/定时/延时消息监听类
package com.***.***.***.listener;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class MqTimeMessageListener implements MessageListener {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${rocketMq.startedApprovalTag}")
private String startedApprovalTag;
@Value("${rocketMq.processedApprove}")
private String processedApprove;
@Override
public Action consume(Message message, ConsumeContext context) {
logger.info("接收到MQ定时/延时消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(), message.getTag(), message.getMsgID(),message.getKey(), new String(message.getBody()));
try {
// 处理业务
} catch (Exception e) {
logger.error("消费MQ消息失败! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
}
return Action.CommitMessage;
}
}
5.消息发送RocketMqProducer
package com.***.***.***.utils;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
import java.util.Properties;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RocketMqProducer {
@Value("${rocketMq.groupId}")
private String groupId;
@Value("${rocketMq.sendMegGroup}")
private String sendMegGroup;
@Value("${rocketMq.accessKey}")
private String accessKey;
@Value("${rocketMq.secretKey}")
private String secretKey;
@Value("${rocketMq.nameSrvAddr}")
private String nameSrvAddr;
@Value("${rocketMq.topic}")
private String topic;
@Value("${rocketMq.startApprovalTag}")
private String startApprovalTag;
@Value("${rocketMq.timeout}")
private String timeout;
@Resource
private ProducerBean producer;
/**
* 发送异步消息
* @param tag
* @param msgKey
* @param messageBody
*/
public void sendAsyncMsg(String tag, String msgKey, byte[] messageBody) {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, sendMegGroup);
properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey);
properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, timeout);
Message msg = new Message(topic, tag, msgKey, messageBody);
try {
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
log.info("rocketMq发送成功,msg:{}", JSON.toJSONString(msg));
}
@Override
public void onException(final OnExceptionContext context) {
log.info("rocketMq发送失败:tag:{},topic:{},body:{}", tag, context.getTopic(), new String(messageBody), context.getException());
// todo 持久化失败消息,定时补偿
}
});
} catch (ONSClientException e) {
log.info("rocketMq发送异常:", e);
}
}
}
6.调用消息发送
@Autowired
private RocketMqProducer rocketMqProducer;
rocketMqProducer.sendAsyncMsg("tag", "msgKey", "msg";