RocketMq订阅与发送消息

2022-06-08  本文已影响0人  CodeFarmerYang

使用阿里云的rocketMq订阅与发送消息,直接上代码:

1.引入pom依赖

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>ons-client</artifactId>
    <version>1.8.8.Final</version>
</dependency>

2.新建RocketMqUtil类

package com.**.***.***.utils;

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.***.***.***.listener.MqMessageListener;
import com.***.***.***.listener.MqTimeMessageListener;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Slf4j
@Configuration
public class RocketMqUtil {

    @Value("${rocketMq.groupId}")
    private String groupId;

    @Value("${rocketMq.accessKey}")
    private String accessKey;

    @Value("${rocketMq.secretKey}")
    private String secretKey;

    @Value("${rocketMq.nameSrvAddr}")
    private String nameSrvAddr;

    @Value("${rocketMq.topic}")
    private String topic;

    @Value("${rocketMq.startApprovalTag}")
    private String startApprovalTag;

    @Value("${rocketMq.timeout}")
    private String timeout;

    @Autowired
    private MqMessageListener messageListener;

    @Autowired
    private MqTimeMessageListener timeMessageListener;

    /**
     * 创建消息生产者
     * @return
     */
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public Producer producer() {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
        properties.put(PropertyKeyConst.AccessKey, accessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        properties.put(PropertyKeyConst.GROUP_ID, groupId);
        properties.put(PropertyKeyConst.SendMsgTimeoutMillis, timeout);

        ProducerBean producer = new ProducerBean();
        producer.setProperties(properties);

        log.info("rocketMq创建生产者成功");
        return producer;
    }


    /**
     * 创建消息订阅
     * @return
     */
    @Bean(initMethod = "start")
    public ConsumerBean consumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);
        properties.put(PropertyKeyConst.AccessKey, accessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);

        consumerBean.setProperties(properties);

        // 订阅消息
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        // 订阅普通消息
        Subscription subscription = new Subscription();
        subscription.setTopic(topic);
        subscription.setExpression("start_approve||process_approve");
        subscriptionTable.put(subscription, messageListener);
        // 订阅定时/延时消息
        Subscription subscriptionTime = new Subscription();
        subscriptionTime.setTopic(topic);
        subscriptionTime.setExpression("start_approve||process_approve");
        subscriptionTable.put(subscriptionTime, timeMessageListener);

        consumerBean.setSubscriptionTable(subscriptionTable);
        log.info("rocketMq订阅成功");
        return consumerBean;
    }
}

3.普通消息监听类MqMessageListener

package com.***.***.***.listener;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;


@Component
public class MqMessageListener implements MessageListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public Action consume(Message message, ConsumeContext context) {
        logger.info("接收到MQ普通消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                    message.getTopic(), message.getTag(), message.getMsgID(),message.getKey(), new String(message.getBody()));
        try {
            // 处理业务
            return Action.CommitMessage;
        } catch (Exception e) {
            logger.error("消费MQ消息失败! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
            return Action.ReconsumeLater;
        }
    }

}

4.异步/定时/延时消息监听类

package com.***.***.***.listener;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
public class MqTimeMessageListener implements MessageListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${rocketMq.startedApprovalTag}")
    private String startedApprovalTag;

    @Value("${rocketMq.processedApprove}")
    private String processedApprove;


    @Override
    public Action consume(Message message, ConsumeContext context) {
        logger.info("接收到MQ定时/延时消息 -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                    message.getTopic(), message.getTag(), message.getMsgID(),message.getKey(), new String(message.getBody()));


        try {
            // 处理业务

        } catch (Exception e) {

            logger.error("消费MQ消息失败! msgId:" + message.getMsgID()+"----ExceptionMsg:"+e.getMessage());
        } 
        return Action.CommitMessage;
    }




}

5.消息发送RocketMqProducer

package com.***.***.***.utils;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
import java.util.Properties;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class RocketMqProducer {

    @Value("${rocketMq.groupId}")
    private String groupId;

    @Value("${rocketMq.sendMegGroup}")
    private String sendMegGroup;

    @Value("${rocketMq.accessKey}")
    private String accessKey;

    @Value("${rocketMq.secretKey}")
    private String secretKey;

    @Value("${rocketMq.nameSrvAddr}")
    private String nameSrvAddr;

    @Value("${rocketMq.topic}")
    private String topic;

    @Value("${rocketMq.startApprovalTag}")
    private String startApprovalTag;

    @Value("${rocketMq.timeout}")
    private String timeout;

    @Resource
    private ProducerBean producer;


    /**
     * 发送异步消息
     * @param tag
     * @param msgKey
     * @param messageBody
     */
    public void sendAsyncMsg(String tag, String msgKey, byte[] messageBody) {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, sendMegGroup);
        properties.put(PropertyKeyConst.AccessKey, accessKey);
        properties.put(PropertyKeyConst.SecretKey, secretKey);
        properties.put(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, timeout);

        Message msg = new Message(topic, tag, msgKey, messageBody);
        try {
            producer.sendAsync(msg, new SendCallback() {
                @Override
                public void onSuccess(final SendResult sendResult) {
                    log.info("rocketMq发送成功,msg:{}", JSON.toJSONString(msg));
                }

                @Override
                public void onException(final OnExceptionContext context) {
                    log.info("rocketMq发送失败:tag:{},topic:{},body:{}", tag, context.getTopic(), new String(messageBody), context.getException());
                    // todo 持久化失败消息,定时补偿
                }
            });
        } catch (ONSClientException e) {
            log.info("rocketMq发送异常:", e);
        }
    }
}


6.调用消息发送

@Autowired
private RocketMqProducer rocketMqProducer;
rocketMqProducer.sendAsyncMsg("tag", "msgKey", "msg";
上一篇 下一篇

猜你喜欢

热点阅读