3 发送消息并设置Confirm和Return

2020-07-15  本文已影响0人  Finlay_Li

Confirm

消息消费后的回调,对消费结果进行确认

Return

当消息路由不可达时,触发回调

配置

#--------------------------------------------rabbitmq--------------------------------------------
#连接工厂
spring.rabbitmq.addresses=xxxx:5677,xxx:5678
spring.rabbitmq.username=guest
spring.rabbitmq.password=qwg-rabbitmq@guest
#虚拟机
spring.rabbitmq.virtual-host=qwg-app-dev
#------------生产端
# 开启:消息确认
spring.rabbitmq.publisherConfirms= true
# 开启:路由不可达的消息返回
spring.rabbitmq.publisher-returns= true
# 设置true 监听器会收到:路由不可达的消息,从而可对路由不可达的消息进行处理,保证消息的路由成功;如果为false,那么Broker会自动删除该消息
spring.rabbitmq.template.mandatory= true
#------------消费端
# 手工签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual

生产端

package com.finlay.scaffold.boot;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitSender {
    private static final String EXCHANGE_NAME = "boot.exchange";
    private static final String ROUTING_KEY = "springboot.hello";
//    private static final String ROUTING_KEY = "springboot.hexllo"; ----------当routing_key 或 exchange不存在时:触发ReturnCallback

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //confirm
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
            System.out.println("*************消息发送成功**************");
            System.out.println("ack签收结果------------>:" + b);
            System.out.println("若发生异常,异常信息------------>:" + s);
        }
    };

    //return
    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {

        @Override
        public void returnedMessage(Message message, int replyCode, String replyText,
                                    String exchange, String routingKey) {
            System.out.println("**************Return********************");
            System.out.println("replyCode:" + replyCode);
            System.out.println("replyText:" + replyText);
            System.out.println("exchange:" + exchange);
            System.out.println("routingKey:" + routingKey);
        }
    };

    public void sendString() {
        String msg = "rabbitmq--------->springboot--------->hello";
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, msg);
    }
}

消费端

package com.finlay.scaffold.boot;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

Component
public class RabbitReceiver {

    private static final String EXCHANGE_NAME = "boot.exchange";
//    private static final String ROUTING_KEY = "springboot.#";
    private static final String ROUTING_KEY = "springboot.hello";
    private static final String QUEUE_NAME = "boot.queue";

    //直接通过@RabbitListener,完成QUEUE,EXCHANGE 的【声明、绑定】
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = QUEUE_NAME,
                    durable = "true"),
            exchange = @Exchange(value = EXCHANGE_NAME,
                    type = ExchangeTypes.TOPIC,
                    durable = "false"),
            key = ROUTING_KEY
    ))
    @RabbitHandler
    public void rec(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            channel.basicQos(1);
            System.out.println("receiver: " + msg);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            //不建议再放回队列,可以采用ConfirmCallback 机制进行处理
            throw new RuntimeException(e);
        }
    }
}


上一篇 下一篇

猜你喜欢

热点阅读