收藏RabbitMQ

RabbitMQ高级整合应用-5、RabbitMQ与Spring

2021-12-27  本文已影响0人  那钱有着落吗

这个帖子我们就可以使用springboot经典的配置文件的方式来实现消息的确认和失败回调


image.png

消息确认和失败回调 代码示例

配置文件

spring.application.name=rabbit-sample
server.port=63001


#rabbitMQ连接字符串
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host = /
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

消息发送类,我们在消息发送类中定义了消息的确认方法,消息的失败回调方法:

@Component
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("correlationData:"+correlationData);
            System.out.println("ack:"+ack);
            if(!ack){
                System.out.println("异常处理机制:-----");
            }
        }
    };

    final RabbitTemplate.ReturnsCallback returnsCallback = new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            System.out.println("return:"+returned);
        }
    };

    public void send(Object message, Map<String,Object> properties) throws Exception{
        MessageHeaders messageHeaders = new MessageHeaders(properties);
        Message<Object> msg = MessageBuilder.createMessage(message,messageHeaders);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnsCallback(returnsCallback);

        // CorrelationData cd = new CorrelationData();
        // cd.setId("444555666");//全局唯一id,可以加时间戳
        rabbitTemplate.convertAndSend("exchange-boot","rabbit.a",msg);
    }

}

测试类:

@Test
    public void send1() throws Exception{
        Map<String,Object> messageProperties = new HashMap<>();
        messageProperties.put("number",1234);
        messageProperties.put("sendTime",new Date());
        rabbitSender.send("this is a msg!",messageProperties);
    }

消费端配置


image.png

消费端@RabbitListener注解使用

在前面的基础上加上下面配置:

spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency = 5
spring.rabbitmq.listener.simple.max-concurrency = 10

创建一个消息接收类:


@Component
public class RabbitReceiver {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(
             bindings = @QueueBinding(
                     value = @Queue(value = "queue-boot",durable = "true"),
                     exchange = @Exchange(value="exchange-boot",durable = "true",
                             type="topic",ignoreDeclarationExceptions ="true"),
                     key = "spring.*"
             )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception{
        System.out.println("-------");
        System.out.println("消费端payload:"+message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手动ACK
        channel.basicAck(deliveryTag,false);
    }
}

可以看到我们使用注解的方式创建了一个exchange,一个队列,然后还建立了绑定关系,然后注解的方法就监听这个队列,接收消息并手动ack。

使用配置文件配置exchange以及队列等信息

直接在代码中写死是不灵活的方式,所以我们在配置文件写好,然后以变量的方式注入到rabbit的注解中以达到创建队列的方式。

在接收端我们也可以直接接收java对象,这种方式更加便捷:

上一篇下一篇

猜你喜欢

热点阅读