rabbitmq整合springboot

2021-10-10  本文已影响0人  念䋛

Springboot 版本为2.3.10.RELEASE
消费端
在整合springboot的时候,个人认为有两种方式来消息确认,一种是完全使用配置的方式,一种是部分使用配置的方式

  1. 完全使用springboot配置的方式
    application.yml
spring:
  rabbitmq:
    host: 192.168.137.141
    port: 5672
    username: duoduo
    password: duoduo
    virtual-host: /duoduo
    #三种方式 SIMPLE() CORRELATED(执行ConfirmCallback) NONE(发送失败直接丢弃)
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      type: direct #direct和simple
      direct:
        acknowledgeMode: manual #auto代表自动接收消息,manual手动确认消息
        prefetch: 1  #这个就是basicQos,同时处理多少条消息
        defaultRequeueRejected: true #消息拒绝是否重新入队
        retry:
          enabled: true
          maxAttempts: 3

监听


@Configuration
public class RabbitMqConsumer {
    public static AtomicInteger count = new AtomicInteger (0);
    public static AtomicInteger count1 = new AtomicInteger (0);

    /**
     * @RabbitListener(queues = {"spirngboot_queue"})可以直接监听队列,前提是服务端已经创建了队列,交换机也绑定了队列
     * 也可以创建交换机 队列 比如注释掉的代码
     */
    //@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spirngboot_queue", durable = "true"),
            //exchange = @Exchange(name = "spirngboot_queue_topic_exchange", durable = "true", type = "topic"),
            //key = "springboot.#"))
    @RabbitListener(queues = {"spirngboot_queue"})
    @RabbitHandler
    public void message(@Payload User user) throws IOException {
        //这里是完全使用springboot的注解的方式,并且 acknowledgeMode: auto 要配置成auto的方式,配置成manual消息不会确定接收,
        //但是这里有一个问题,如果使用了自定义序列化之后,配置成manual也可以正常接收
        //配置文件中的retry,如果在接收消息的时候发生了异常那么会重试3次,3次之后消息就会丢弃掉,虽然配置了defaultRequeueRejected为true
        //但是如果使用了自定义序列化之后,不管defaultRequeueRejected是否为true,消息拒绝之后会重新的放到队列中
        //本人对这一地方不太了解
        //实际生产中本人也不会经常使用纯配置的方式
 try {
    System.out.println (user.getName ());
} catch (Exception e) {
    //todo 代码的回退,如果try里面操作了数据库,可以通过事务自动的回退,但是如果操作了redis,那需要对redis 的回退等操作
    throw new IOException (e.getMessage ());
}

    }
    //配合@RabbitHandler注解实现了消息的序列化格式,这样可以直接传对象,而不用吧对象转为json字符串,而且Jackson2JsonMessageConverter序列化体积更小传输更快
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory (ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory ();
        simpleRabbitListenerContainerFactory.setConnectionFactory (connectionFactory);
        simpleRabbitListenerContainerFactory.setMessageConverter (new Jackson2JsonMessageConverter ());
        return simpleRabbitListenerContainerFactory;
    }
}

纯配置的方式,在监听的代码中只是接收消息就可以,不用手动的basicAck来确认消息,或者basicNack拒绝消息,纯配置的方式,对异常的捕获,之后是要继续手动抛出异常,默认重试3次,之后肯定不会重新放到队列中,本人没看过源码,看表象是这样的.而且acknowledgeMode:
Auto 才会接收消息
如果配置了@RabbitHandler 修改了序列化机制,上面的配置还是需要改变
本人对纯配置的方式理解不太透彻,希望大家给我意见.

  1. 使用手动的确认信息
@Configuration
public class RabbitMqConsumer {
    public static Map<String, AtomicInteger> map = new HashMap<> ();

    /**
     * @RabbitListener(queues = {"spirngboot_queue"})可以直接监听队列,前提是服务端已经创建了队列,交换机也绑定了队列
     * 也可以创建交换机 队列 比如注释掉的代码
     */
    //@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spirngboot_queue", durable = "true"),
            //exchange = @Exchange(name = "spirngboot_queue_topic_exchange", durable = "true", type = "topic"),
            //key = "springboot.#"))
    @RabbitListener(queues = {"spirngboot_queue"})
    @RabbitHandler
    public void message( Channel channel,Message message) throws Exception {
        MessageProperties messageProperties = message.getMessageProperties ();
        if(null==map.get("spirngboot_queue"+messageProperties.getDeliveryTag ())){
            map.put ("spirngboot_queue" + messageProperties.getDeliveryTag (), new AtomicInteger ());
        }
        try {
            //todo 逻辑代码,这里要注意的是,执行这段代码是多线程的,要注意多线程安全
            System.out.println ("消息的消费"+new String(message.getBody ()));
            //模拟异常,如果是操作了sevice层的数据库,发生异常可以事务回滚,如果包含其他操作,需要在catch中将操作回退,因为是多线程,要考虑线程安全问题
            int i = 1 / 0;
            //如果消费成功,则确认消息
            channel.basicAck (messageProperties.getDeliveryTag (), false);
        } catch (Exception e) {
            //利用messageProperties.getDeliveryTag ()得到消息唯一的id,判断重复消息了几次,如果超过3次把最后的true改为false,将消息不重新放到队列中
            //因为可能是分布式系统,可以使用reids来判断消息消费了几次,
            //如果操过了3次,从redis中将messageProperties.getDeliveryTag ()删除即可
            if(map.get("spirngboot_queue"+messageProperties.getDeliveryTag ()).addAndGet (1)<3){//判断消息失败消费了几次,如果操过能容忍的最大次数后将消息丢弃,这里可以使用死心队列接收失败的消息
                System.out.println ("消息消费失败,重新放到队列中,失败消费次数"+map.get("spirngboot_queue"+messageProperties.getDeliveryTag ()).get ());
                channel.basicNack (messageProperties.getDeliveryTag (), false, true);
            }else{
                System.out.println ("消息消费失败,超过3次,丢弃消息,可以放到死心队列中");
                channel.basicNack (messageProperties.getDeliveryTag (), false, false);
            }
        }
    }
    //配合@RabbitHandler注解实现了消息的序列化格式,这样可以直接传对象,而不用吧对象转为json字符串,而且Jackson2JsonMessageConverter序列化体积更小传输更快
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory (ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory ();
        simpleRabbitListenerContainerFactory.setConnectionFactory (connectionFactory);
        simpleRabbitListenerContainerFactory.setMessageConverter (new Jackson2JsonMessageConverter ());
        return simpleRabbitListenerContainerFactory;
    }
}

生产者 这里使用的springboot 2.5.0版本
配置类


/**
 * 保证消息的确认发送有三种方式,事务,Confirms和异步监听的方式
 * 1.使用监听就是下面rabbitTemplateListener方式
 * 2.Confirms方式为Web类的rabbitMq方法,Confirms可以统一确认和单调确认,实例中为统一确认
 * 3.还有一种方式就是事务的方式,由于效率很低,一般很少使用,这里没有做介绍
 * 效率上监听的效率要高于Confirms,实际生产上也是建议使用监听的方式
 */
@Configuration
public class RabbitMqConfiguration {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 下面是项目启动的时候创建了队列 交换机 并将队列和交换机绑定在一起
     * 定义交换机的名字
     */
    public static final String EXCHANGE_NAME = "spirngboot_queue_topic_exchange";
    /**
     * 定义队列的名字
     */
    public static final String QUEUE_NAME = "spirngboot_queue";

    /**
     * 声明交换机
     */
    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.topicExchange (EXCHANGE_NAME).durable (true).build ();
    }

    /**
     * 声明队列
     */
    @Bean("bootQueue")
    public Queue bootQueue() {
        //指定队列的同时也指定了队列的最大优先级,发送消息的时候也要指定消息的优先级,rabbitmq 的管理页面的queue会有一个pri的标识
        return QueueBuilder.durable (QUEUE_NAME).maxPriority (5).build ();
    }

    /**
     * 队列与交换机进行绑定
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
        return BindingBuilder.bind (queue).to (exchange).with ("springboot.#").noargs ();
    }

    @PostConstruct
    public void rabbitTemplateListener() {
        //设置confirmeCallback 需要在配置文件中加publisher-confirm-type: correlated
        rabbitTemplate.setConfirmCallback (new RabbitTemplate.ConfirmCallback () {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                //ack 为  true表示 消息已经到达交换机,此时消息并没有到达队列
                System.out.println (correlationData);
                if (ack) {
                    //交换价接收消息成功 cause为null
                    System.out.println ("交换机接收成功消息");
                } else {
                    //接收失败
                    System.out.println ("交换机接收失败消息" + cause);
                    //做一些处理,让消息再次发送。
                }
            }
        });
        //启动return机制的两种方式
        /**1.配置文件中 publisher-returns: true
         * 2.rabbitTemplate.setMandatory (true);
         */
        //定义回调,交换机是否到达队列,发生在ack成功之后
        rabbitTemplate.setReturnsCallback (new RabbitTemplate.ReturnsCallback () {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                //获取交换机
                System.out.println ("获取交换机" + returned.getExchange ());
                //获取消息对象
                System.out.println ("获取消息对象" + returned.getMessage ());
                //获取错误码
                System.out.println ("获取错误码" + returned.getReplyCode ());
                //获取错误信息
                System.out.println ("获取错误信息" + returned.getReplyText ());
                //获取路由key
                System.out.println ("获取路由key" + returned.getRoutingKey ());
            }
        });
    }
}

发送消息

@Controller
@ResponseBody
public class Web {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("rabbitMq")
    public void rabbitMq() {
        //直接发送消息
//        rabbitTemplate.convertAndSend ("spirngboot_queue_topic_exchange", "springboot1.HelloRabbitMq", "HelloRabbitMq" );
        //使用了Jackson2JsonMessageConverter序列化,发送消息可以直接发送对象,而不是字符串
//        rabbitTemplate.setMessageConverter (new Jackson2JsonMessageConverter ());
        //将消息设置优先级
        Boolean invoke = rabbitTemplate.invoke ((ops) -> {
            for (int i = 0; i < 2; i++) {
                User user = new User ("zhangsan"+i);
                //偶数为优先级高的消息,接收消息的时候会发现偶数的消息比奇数的消息更早的被消费
                if (i % 2 == 0) {
                    ops.convertAndSend ("spirngboot_queue_topic_exchange", "springboot.HelloRabbitMq", user.toString (),
                            message -> {
                                //设置优先级为5的消息,0最小 255最大
                                message.getMessageProperties ().setPriority (5);
                                return message;
                            });
                } else {
                    ops.convertAndSend ("spirngboot_queue_topic_exchange", "springboot.HelloRabbitMq", user.toString (),
                            message -> {
                                message.getMessageProperties ().setPriority (0);
                                return message;
                            });
                }
            }
            //如果2000毫秒之内没有收到服务端的确认消息,下面的invoke为false,此方法为阻塞方法
            return rabbitTemplate.waitForConfirms (2000);
        });
        if (invoke) {
            System.out.println ("消息发送成功-----");
        } else {
            System.out.println ("消息发送失败-----");
        }
    }
}

上一篇 下一篇

猜你喜欢

热点阅读