SpringBoot整合rabbitmq

2022-01-06  本文已影响0人  任未然

一. 概述

参考开源项目https://github.com/xkcoding/spring-boot-demo
本Demo简单集成rabbitmq的使用

二. 安装rabbitmq

用docker安装, 没安装docker的先安装docker

  1. 下载镜像:docker pull rabbitmq:3.7.7-management

  2. 运行容器:docker run -d -p 5671:5617 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 --name rabbit-3.7.7 rabbitmq:3.7.7-management

  3. 浏览器打开网址http://localhost:15672/

默认账号密码: guest/guest

image.png
  1. 新建交换机,命名为: topic.wpr
image.png
  1. 新建队列

队列名为:queue.2, 队列绑定交换机:topic.wpr,路由Key:queue.wpr

RoutingKey规则

  • 路由格式必须以 . 分隔,比如 user.email 或者 user.aaa.email
  • 通配符 * ,代表一个占位符,或者说一个单词,比如路由为 user.*,那么 user.email 可以匹配,但是 user.aaa.email 就匹配不了
  • 通配符 # ,代表一个或多个占位符,或者说一个或多个单词,比如路由为 user.#,那么 user.email 可以匹配,user.aaa.email 也可以匹配
image.png

三. SpringBoot项目

3.1 application.yml

server:
  port: 8080
  servlet:
    context-path: /demo
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        retry:
          #enabled:开启失败重试
          enabled: true
          #第一次重试的间隔时长
          initial-interval: 1000ms
          #最大重试次数
          max-attempts: 3
          #最长重试间隔,超过这个间隔将不再重试
          max-interval: 10000ms
          #下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
          multiplier: 1
        #重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列
        default-requeue-rejected: true
        # 手动提交消息
        acknowledge-mode: manual
      direct:
        # 手动提交消息
        acknowledge-mode: manual

3.2 监听队列事件

import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.Channel;
import com.xkcoding.mq.rabbitmq.constants.RabbitConsts;
import com.xkcoding.mq.rabbitmq.message.MessageStruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@RabbitListener(queues = "queue.2")
@Component
public class QueueTwoHandler {

    @RabbitHandler
    public void directHandlerManualAck(MessageStruct messageStruct, Message message, Channel channel) {
        //  如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
        final long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("队列2,手动ACK,接收消息:{}", JSONUtil.toJsonStr(messageStruct));
            // 通知 MQ 消息已被成功消费,可以ACK了
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            try {
                // 处理失败,重新压入MQ
                channel.basicRecover();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }
}

3.3 消息发送

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootDemoMqRabbitmqApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendTopic1() {
        rabbitTemplate.convertAndSend("topic.wpr", "queue.wpr", new MessageStruct("topic message"));
    }
}
上一篇下一篇

猜你喜欢

热点阅读