应用开发与调优

RabbitMQ与SpringBoot2.0整合

2018-10-22  本文已影响149人  匆匆岁月

application.properties:

spring.rabbitmq.addresses=192.
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

RabbitMQ与SpringBoot整合配置详解:

1. 生产端核心配置


RabbitSender:

package com.pyy.springboot.producer;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData:" + correlationData);
            System.err.println("ack:" + ack);
            if(!ack) {
                System.err.println("异常处理...");
            }else {
                // 更新数据库对应的消息状态:已发送
            }
        }
    };


    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.err.println("return exchange:" + exchange + " , routingKey:" + routingKey + ", replyCode:" + replyCode + ", replyText:" + replyText);
        }
    };

    public void send(Object message, Map<String, Object> headerProperties) throws Exception {
        MessageHeaders messageHeaders = new MessageHeaders(headerProperties);
        Message msg = MessageBuilder.createMessage(message, messageHeaders);

        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("userid" + System.currentTimeMillis());// id + 时间戳 全局唯一 实际消息的id
        //rabbitTemplate.convertAndSend("pyy.exchange", "springboot.hello", msg, correlationData);

        rabbitTemplate.convertAndSend("pyy.exchange", "fasdfsf.hello", msg, correlationData);

    }
}

2. 消费端核心配置


spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5

@RabbitListener注解使用

package com.pyy.mq.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 消息接收者
 * @RabbitListener bindings:绑定队列
 *   @QueueBinding  value:绑定队列的名称
 *                exchange:配置交换器
 * 
 *     @Queue value:配置队列名称
 *        autoDelete:是否是一个可删除的临时队列
 * 
 *     @Exchange value:为交换器起个名称
 *           type:指定具体的交换器类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.info.routing.key}"
        )
)
public class InfoReceiver {

    /**
     * 接收消息方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Info receiver:" + msg);
    }
}

@RabbitListener注解如果没有存在exchange和queue会自动创建

案例详细代码:https://github.com/pyygithub/springboot-rabbitmq

上一篇下一篇

猜你喜欢

热点阅读