Spring boot spring boot分布式相关技术

RabbitMQ与SpringBoot整合实战

2019-04-08  本文已影响118人  若兮缘

SpringBoot整合RabbitMQ

SpringBoot与RabbitMQ集成非常筒単,不需要做任何的额外设置只需要两步即可:
step1:引入相关依赖:spring-boot-starter-amqp
step2:対application.properties迸行配置

生产端核心配置
消费端核心配置

SpringBoot整合RabbitMQ实战

1.首先创建一个Spring Boot工程,这里使用Spring Tool Suite工具,选择导航菜单File --> New --> Spring Starter Project

2.添加依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

然后复制一份工程,重命名为rabbitmq-springboot-consumer,修改pom相关artifactId,name及description

生产端工程

添加生产端配置

# rabbitmq连接基本配置
spring.rabbitmq.addresses=192.168.0.113:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 开启confirm机制
spring.rabbitmq.publisher-confirms=true
# 开启return模式
spring.rabbitmq.publisher-returns=true
# 配合return机制使用,表示接收路由不可达的消息
spring.rabbitmq.template.mandatory=true

创建配置类

@Configuration
@ComponentScan({"com.rxy.springboot.*"})
public class MainConfig {

}
消息的confirm和return机制

创建生产端处理类

import java.util.Map;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class RabbitSender {

    //自动注入RabbitTemplate模板类
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //确认机制
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        /**
         * correlationData: 回调的相关数据,包含了消息ID
         * ack: ack结果,true代表ack,false代表nack
         * cause: 如果为nack,返回原因,否则为null
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                //做一些补偿机制等
                System.err.println("异常处理....");
            }
        }
    };
    //返回机制
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, 
                                    String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: " 
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };
    
    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders messageHeaders = new MessageHeaders(properties);
        //注意导包
        Message msg = MessageBuilder.createMessage(message, messageHeaders);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 ,保证全局唯一 ,这个是实际消息的ID
        //在做补偿性机制的时候通过ID来获取到这条消息进行重发
        String id = "1234567890";
        CorrelationData correlationData = new CorrelationData(id);
        //exchange, routingKey, object, correlationData
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
    }
}

在管控台创建topic交换机exchange-1和队列queue-1,并建立绑定关系为springboot.#
测试方法

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqSpringbootProducerApplicationTests {

    @Test
    public void contextLoads() {
    }

    @Autowired
    private RabbitSender rabbitSender;

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
    @Test
    public void testSender1() throws Exception {
         Map<String, Object> properties = new HashMap<>();
         properties.put("number", "12345");
         properties.put("send_time", simpleDateFormat.format(new Date()));
         rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties);
    }
}

运行测试方法,打印如下内容

correlationData: CorrelationData [id=1234567890]
ack: true

将发送消息convertAndSend的routingKey修改为spring.hello,再次运行测试方法,打印如下内容

return exchange: exchange-1, routingKey: spring.abc, replyCode: 312, replyText: NO_ROUTE
correlationData: CorrelationData [id=1234567890]
ack: true
消费端工程
@RabbitMQListener注解

消费端配置

spring.rabbitmq.addresses=192.168.0.113:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 设置签收模式:AUTO(自动签收)、MANUAL(手工签收)、NONE(不签收,没有任何操作)
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
# 设置当前消费者数量(线程数)
spring.rabbitmq.listener.simple.concurrency=5
# 设置消费者最大并发数量
spring.rabbitmq.listener.simple.max-concurrency=10

消费端消息处理类

import java.util.Map;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;
import com.rxy.springboot.entity.Order;

@Component
public class RabbitReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1", 
            durable="true"),
            exchange = @Exchange(value = "exchange-1", 
            durable="true", 
            type= "topic", 
            ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
            )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }
}

前面生产端已经发送了一条消息到queue-1队列,可以再运行发送一条,然后运行消费端工程启动类Application,打印如下

--------------------------------------
消费端Payload: Hello RabbitMQ For Spring Boot!
--------------------------------------
消费端Payload: Hello RabbitMQ For Spring Boot!

其实@RabbitListener会自动声明队列、交换机及绑定关系,可以在管控台删除对应的队列和交换机,然后重新运行进行测试

使用配置方式

将队列、交换机、绑定关系使用配置方式,并且消息体内容使用java对象

  1. 首先增加相关的配置
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*

另外两个工程都增加实体类Order,注意要发送java对象,必须实现序列化接口

public class Order implements Serializable {
    private String id;
    private String name;
}
  1. 消费类增加Order对象消息的处理方法
    @Payload: 接收消息的消息体对象
    @Headers: 接收消息的属性
    AmqpHeaders: 抽象类,里面包含了消息的常用属性key
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
            durable="${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
            durable="${spring.rabbitmq.listener.order.exchange.durable}", 
            type= "${spring.rabbitmq.listener.order.exchange.type}", 
            ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
            )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, 
            Channel channel, 
            @Headers Map<String, Object> headers) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端order: " + order.getId());
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }

生产端发送方法,直接发送java对象

    //发送消息方法调用: 构建自定义对象消息
    public void sendOrder(Order order) throws Exception {
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局唯一 
        CorrelationData correlationData = new CorrelationData("0987654321");
        rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
    }

生产端测试方法

    @Test
    public void testSender2() throws Exception {
         Order order = new Order("001", "第一个订单");
         rabbitSender.sendOrder(order);
    }
运行说明

先启动消费端,然后启动生产端

# 生产端打印
correlationData: CorrelationData [id=0987654321]
ack: true
# 消费端打印
--------------------------------------
消费端order: 001
上一篇下一篇

猜你喜欢

热点阅读