SpringBoot

SpirngBoot使用RocketMQ

2020-12-17  本文已影响0人  HachiLin

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,日志记录,流量削锋、分布式事务等问题,实现高性能,高可用,可伸缩和最终一致性架构。

1. Maven添加rocketmq依赖

<dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>${rocketmq.version}</version>
</dependency>

2. 设置配置项信息

rocketmq:
    enable: true
    namesrvAddr: host:port
    instanceName: XX  # 消息名
    defaultProducer:
        enable: true
        producerGroup: XXProducer
    defaultConsumer:
        enable: true
        producerGroup: XXConsumer
        topic: topic1,topic2,...
        batchSize: 10
        minThread: 4
        maxThread: 16

3. 消息生产者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.Charset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component
public class XXProducer {
    private static final Logger logger = LoggerFactory.getLogger(XXProducer.class);
    
    private String bodyCharset = "UTF-8";
    
    @Autowired
    private String assetsModifyTopic;
    
    public boolean sendMessage(String body, topicName) {
        Message message = new Message();
        try {
            message.setTopic(topicName);
            message.setBody(body.getBytes(Charset.forName(bodyCharset)));
            SendResult sendResult = defaultMqProducer.send(message);
            if (sendResult == null) {
                logger.warn("消息发送失败. topicName: {}, body: {}, err: {}", topicName, body);
                return false;
            } else {
                return ture;
            }
        } catch (Exception e) {
            logger.warn("消息发送异常. topicName: {}, body: {}, err: {}", topicName, body);
            return false;
        }
    }
}

4. 消息消费者

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Repository("messageListenerConcurrently")
public class XXConsumer implements MessageListenerConcurrently {
    private static final Logger logger = LoggerFactory.getLogger(XXConsumer.class);
    
    @Oveerider
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt msg : msgs) {
            try {
                // TODO 业务逻辑
            } catch (Exception e) {
                logger.warn(~);
                return ConsumeConcurrentlyStatus.CONSUME_LATER; 
            }    
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

5. 参考

上一篇下一篇

猜你喜欢

热点阅读