1-4 RabbitMQ-Boot整合

2020-07-08  本文已影响0人  Finlay_Li

概念介绍:队列、绑定、虚拟主机、消息

Binding基本概念

Queue基本概念

Message基本概念

Message的属性

private String contentType; 
private String contentEncoding;
private Map<String,Object> headers; // 自定义属性
private Integer deliveryMode;   // 消息的送达模式
private Integer priority;   // 消息的优先级
private String correlationId;   // 消息的唯一ID
private String replyTo; // 消息失败返回的队列
private String expiration;  // 消息的过期时间
private String messageId;   // 消息的ID
private Date timestamp; // 时间戳
private String type; 
private String userId;
private String appId;
private String clusterId;

依赖

BOOT 2.0.2RELEASE

<!--spring-rabbit和amqp-client不能同时存在,否则会出现class引用错误-->
<!--<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.7.12</version>
</dependency>-->

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置连接

#连接工厂
spring.rabbitmq.addresses=zzz:5677,xxxx:5678
spring.rabbitmq.username=guest
spring.rabbitmq.password=qwg-rabbitmq@guest
#虚拟机
spring.rabbitmq.virtual-host=qwg-app-dev
#------------生产端
# 开启:消息确认
spring.rabbitmq.publisherConfirms= true
# 开启:路由不可达的消息返回
spring.rabbitmq.publisher-returns= true
# 设置true 监听器会收到:路由不可达的消息,从而可对路由不可达的消息进行处理,保证消息的路由成功
spring.rabbitmq.template.mandatory= true
#------------消费端
# 手工签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual

声明队列

public interface MqQueue {
    String HELLO_MQ = "hello_mq";
}
@Configuration
public class MqQueueConfiguration {

    @Bean
    public Queue helloQueue() {
        //持久化队列
        return new Queue(MqQueue.HELLO_MQ, true);
    }
}

生产者

@Component
public class MqSender {

//使用spring amqp提供的模板,完成消息发送
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String msg = "Hello RabbitMq";
        rabbitTemplate.convertAndSend(MqQueue.HELLO_MQ, msg);
    }
}

消费者

//自动监听
@Component
public class MqReceiver {

 /*@RabbitListener
    声明监听者

    @RabbitHandler 
    声明消费消息的方法*/

    @RabbitHandler
    @RabbitListener(queues = MqQueue.HELLO_MQ)
    public void receiver(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) throws IOException {
        System.out.println("Receiver ----------------- 消费消息: " + msg);
        //手动确认消费完成
        channel.basicAck(tag, false);
    }
}

AMQP default

为什么我们没有声明交换机,却消费成功?RabbitMQ是必须要交换机的呀~

image.png
上一篇 下一篇

猜你喜欢

热点阅读