Springboot集成RabbitMq

2018-08-08  本文已影响0人  Albert_Yu

spring-boot-starter-amqp项目对消息各种支持。

可以参阅官方文档
https://docs.spring.io/spring-boot/docs/1.5.15.RELEASE/reference/htmlsingle/#boot-features-rabbitmq

RabbitMQ是一个基于AMQP协议的轻量级,可靠,可扩展且可移植的消息代理。Spring使用RabbitMQ AMQP协议进行通信。
abbitMQ配置由外部配置属性控制 spring.rabbitmq.*。例如,您可以在以下部分声明以下部分 application.properties:

 spring.rabbitmq.host = localhost
 spring.rabbitmq.port = 5672
 spring.rabbitmq.username = admin
 spring.rabbitmq.password = secret

有关rabbitmq的属性相关配置可以参照 RabbitProperties 类中的相关源码
默认host是localhost ,port是5672,virtualHost是"/",用户名和密码是guest

一、 SpringBoot集成RabbitMq的最简框架的搭建
1、在pom.xml中添加必要的依赖

       <!-- spring boot amqp包 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

当SpringBoot项目中在pom.xml中引入了上面的这个依赖,那么久已经持有了该RabbitTemplate对象了
默认配置就是上述相关的描述了

二、 SpringBoot集成RabbitMQ快速入门
1、新建一个maven项目 springboot-rabbitmq


image.png

2、在pom.xml中引入必要的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.6.RELEASE</version>
        <relativePath/>
    </parent>

    <groupId>com.yubin.springboot</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <version>1.0.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.7</java.version>
    </properties>

    <dependencies>
        <!-- Spring boot web 集成包 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- spring boot 测试包 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <!-- spring boot amqp包 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

3、在application.properties中配置rabbitmq相关的属性

# ==================== rabbitmq ===========
# rabbitmq 主机地址
spring.rabbitmq.host=127.0.0.1
# rabbitmq 主机端口号
spring.rabbitmq.port=5672
# rabbitmq 用户名
spring.rabbitmq.username=yubin
# rabbitmq 密码
spring.rabbitmq.password=yubin
# rabbitmq 虚拟主机
spring.rabbitmq.virtual-host=/yubin

4、创建启动类

/**
 * SpringBoot集成RabbitMq用例启动类
 *
 * @Author YUBIN
 */
@SpringBootApplication
public class RabbitMqDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqDemoApplication.class, args);
    }
}

5、定义队列信息

package com.yubin.springboot.rabbitmq.configuration;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMq的相关配置
 *
 * @Author YUBIN
 */
@Configuration // 相当于xml配置文件
public class RabbitMqConfiguration {

    @Bean
    public Queue helloQueue() {
        return new Queue("hello-queue");
    }

    @Bean
    public Queue userQueue() {
        return new Queue("user-queue");
    }
}

6、定义生产者

package com.yubin.springboot.rabbitmq.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * rabbitmq生产者示例
 *
 * @Author YUBIN
 */
@Component // 将该类交给Spring管理
public class RabbitMqProducerDemo {

    private static Logger logger = LoggerFactory.getLogger(RabbitMqProducerDemo.class);

    @Autowired // 注入rabbitmq 模板
    private AmqpTemplate rabbitTemplate;

    /**
     * 发送消息的方法 hello-queue
     */
    public void sendHelloMessage() {
        // 定义消息体
        String message = "RabbitMqProducerDemo hello queue send message";
        rabbitTemplate.convertAndSend("hello-queue",message);
        logger.info("==================RabbitMqProducerDemo hello queue send message success");
    }

    /**
     * 发送消息的方法 user-queue
     */
    public void sendUserMessage() {
        // 定义消息体
        String message = "RabbitMqProducerDemo user queue send message";
        rabbitTemplate.convertAndSend("user-queue",message);
        logger.info("===================RabbitMqProducerDemo user queue send message success");
    }
}

7、定义消费者

package com.yubin.springboot.rabbitmq.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * RabbitMq消费者示例
 *
 * @Author YUBIN
 * @create 2018-08-08
 */
@Component // 交给Spring管理
//@RabbitListener(queues = {"hello-queue","user-queue"})
public class RabbitMqConsumerDemo {

    private static Logger logger = LoggerFactory.getLogger(RabbitMqConsumerDemo.class);

    //@RabbitHandler
    @RabbitListener(queues = "hello-queue")
    public void executeHello(String message) {
        logger.info("executeHello================接收到的消息是:" + message);
    }

    @RabbitListener(queues = "user-queue")
    public void executeUser(String message) {
        logger.info("executeUser=================接收到的消息是:" + message);
    }
}

8、测试类

/**
 * SpringBoot集成RabbitMq测试类
 *
 * @Author YUBIN
 */
@RunWith(SpringRunner.class) //SpringRunner相当于 SpringJUnit4ClassRunner的别名类
@SpringBootTest(classes = RabbitMqDemoApplication.class)
public class RabbitMqDemoTest {

    @Autowired
    private RabbitMqProducerDemo producerDemo;

    @Test
    public void test1() {
        producerDemo.sendHelloMessage();
        producerDemo.sendUserMessage();
    }
}

三、 SpringBoot集成RabbitMq各种模式的案例
1、简单队列


image.png

如上述案例所示
2、工作模式(默认是多劳多得的)


image.png
在RabbitMqConfiguration类中增加work-queue
@Bean
public Queue workQueue() {
    return new Queue("work-queue");
}

在RabbitMqProducerDemo类中增加一个发送消息的方法

/**
 * 发送消息的方法 work-queue
 */
public void sendWorkMessage(String message) {
    // 定义消息体
    rabbitTemplate.convertAndSend("work-queue",message);
    logger.info("==================RabbitMqProducerDemo work queue send success:" + message);
}

在RabbitMqConsumerDemo类中增加两个消费者方法
@RabbitListener(queues = "work-queue")
public void executeWork1(String message) {
    logger.info("executeWork1================接收到的消息是:" + message);
}

@RabbitListener(queues = "work-queue")
public void executeWork2(String message) throws InterruptedException {
    Thread.sleep(100);
    logger.info("executeWork2================接收到的消息是:" + message);
}

测试类的书写

@Test
public void test2() throws InterruptedException {
    for (int i = 0; i < 100; i++) {
        producerDemo.sendWorkMessage(i + "");
        Thread.sleep(i*10);
    }
}

3、发布订阅模式 fanount


image.png

在RabbitMqConfiguration中定义队列,交换机以及将交换机和队列进行绑定,当然队列和交换机的绑定也是可以通过在客户端(浏览器上进行绑定)

@Bean
public Queue fanoutQueueA() {
    return new Queue("fanout-queue-A");
}

@Bean
public Queue fanoutQueueB() {
    return new Queue("fanout-queue-B");
}

@Bean
public Queue fanoutQueueC() {
    return new Queue("fanout-queue-C");
}

// 交换机的声明
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanout-exchange");
}

/**
 * 将交换机fanoutExchange与队列fanoutQueueA进行绑定 该种模式的路由key是无效的
 * @param fanoutQueueA
 * @param fanoutExchange
 * @return
 */
@Bean
public Binding bindingFanoutExchangeToFanoutQueueA(Queue fanoutQueueA, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
}

/**
 * 将交换机fanoutExchange与队列fanoutQueueA进行绑定 该种模式的路由key是无效的
 * @param fanoutQueueB
 * @param fanoutExchange
 * @return
 */
@Bean
public Binding bindingFanoutExchangeToFanoutQueueB(Queue fanoutQueueB, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
}

/**
 * 将交换机fanoutExchange与队列fanoutQueueA进行绑定 该种模式的路由key是无效的
 * @param fanoutQueueC
 * @param fanoutExchange
 * @return
 */
@Bean
public Binding bindingFanoutExchangeToFanoutQueueC(Queue fanoutQueueC, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(fanoutQueueC).to(fanoutExchange);
}

在生产者类中RabbitMqProducerDemo添加

/**
 * 发送消息的方法
 * @param message 消息体
 * @param exchangeName 交换机的名字
 * @param routingKey 路由key
 */
public void sendMessage(String message,String exchangeName,String routingKey) {
    // 发生消息  这里的 routingKey 是无效的
    rabbitTemplate.convertAndSend(exchangeName,routingKey,message);
    logger.info("==================RabbitMqProducerDemo " + exchangeName + ":" + routingKey + " send success:" + message);
}

在消费者类中添加

@RabbitListener(queues = "fanout-queue-A")
public void fanoutQueueA(String message) throws InterruptedException {
    logger.info("fanoutQueueA================接收到的消息是:" + message);
}

@RabbitListener(queues = "fanout-queue-B")
public void fanoutQueueB(String message) throws InterruptedException {
    logger.info("fanoutQueueB================接收到的消息是:" + message);
}

@RabbitListener(queues = "fanout-queue-C")
public void fanoutQueueC(String message) throws InterruptedException {
    logger.info("fanoutQueueC================接收到的消息是:" + message);
}

测试

@Test
public void test3() throws InterruptedException {
    producerDemo.sendMessage("哈哈A","fanout-exchange","fanout-queue-A");
    producerDemo.sendMessage("哈哈B","fanout-exchange","fanout-queue-B");
    producerDemo.sendMessage("哈哈C","fanout-exchange","fanout-queue-C");
}

4、路由模式


image.png

在RabbitMqConfiguration中定义队列,交换机以及将交换机和队列进行绑定,当然队列和交换机的绑定也是可以通过在客户端(浏览器上进行绑定)

// 定义队列
@Bean
public Queue directQueueA() {
    return new Queue("direct-queue-A");
}

@Bean
public Queue directQueueB() {
    return new Queue("direct-queue-B");
}

@Bean
public Queue directQueueC() {
    return new Queue("direct-queue-C");
}

// 声明交换机
@Bean
public DirectExchange directExchange() {
    return new DirectExchange("direct-exchange");
}

// 交换机与队列进行绑定并定义routingKey
@Bean
public Binding bindingDirectQueueAToDirectExchangeU(Queue directQueueA, DirectExchange directExchange) {
    return BindingBuilder.bind(directQueueA).to(directExchange).with("update");
}
@Bean
public Binding bindingDirectQueueAToDirectExchangeI(Queue directQueueA, DirectExchange directExchange) {
    return BindingBuilder.bind(directQueueA).to(directExchange).with("insert");
}
@Bean
public Binding bindingDirectQueueAToDirectExchangeD(Queue directQueueA, DirectExchange directExchange) {
    return BindingBuilder.bind(directQueueA).to(directExchange).with("delete");
}
@Bean
public Binding bindingDirectQueueBToDirectExchangeD(Queue directQueueB, DirectExchange directExchange) {
    return BindingBuilder.bind(directQueueB).to(directExchange).with("delete");
}
@Bean
public Binding bindingDirectQueueCToDirectExchangeI(Queue directQueueC, DirectExchange directExchange) {
    return BindingBuilder.bind(directQueueC).to(directExchange).with("insert");
}
在消费者类中添加
@RabbitListener(queues = "direct-queue-A")
public void directQueueA(String message) throws InterruptedException {
    logger.info("directQueueA================接收到的消息是:" + message);
}

@RabbitListener(queues = "direct-queue-B")
public void directQueueB(String message) throws InterruptedException {
    logger.info("directQueueB================接收到的消息是:" + message);
}

@RabbitListener(queues = "direct-queue-C")
public void directQueueC(String message) throws InterruptedException {
    logger.info("directQueueC================接收到的消息是:" + message);
}

测试类

@Test
public void test4() {
    producerDemo.sendMessage("哈哈-update","direct-exchange","update");
    producerDemo.sendMessage("哈哈-insert","direct-exchange","insert");
    producerDemo.sendMessage("哈哈-delete","direct-exchange","delete");
}

5、通配符模式


在RabbitMqConfiguration中定义队列,交换机以及将交换机和队列进行绑定,当然队列和交换机的绑定也是可以通过在客户端(浏览器上进行绑定)
// 声明通配符模式的队列

@Bean
public Queue topicQueueA() {
    return new Queue("topic-queue-A");
}

@Bean
public Queue topicQueueB() {
    return new Queue("topic-queue-B");
}

// 声明通配符模式交换机
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topic-exchange");
}

// 通配符模式下交换机与队列进行绑定并定义routingKey
@Bean
public Binding bindingTopicQueueAToTopicExchange(Queue topicQueueA, TopicExchange topicExchange) {
    return BindingBuilder.bind(topicQueueA).to(topicExchange).with("topic.update");
}
@Bean
public Binding bindingTopicQueueBToTopicExchangeI(Queue topicQueueB, TopicExchange topicExchange) {
    return BindingBuilder.bind(topicQueueB).to(topicExchange).with("topic.#");
}
在消费者类中添加

@RabbitListener(queues = "topic-queue-A")
public void topicQueueA(String message) throws InterruptedException {
    logger.info("topicQueueA================接收到的消息是:" + message);
}

@RabbitListener(queues = "topic-queue-B")
public void topicQueueB(String message) throws InterruptedException {
    logger.info("topicQueueB================接收到的消息是:" + message);
}

测试类

@Test
public void test5() {
    producerDemo.sendMessage("哈哈-update","topic-exchange","topic.update");
    producerDemo.sendMessage("哈哈-insert","topic-exchange","topic.insert");
    producerDemo.sendMessage("哈哈-delete","topic-exchange","topic.delete");
}

四、 SpringBoot集成RabbitMq扩展
当然如果你对这种形式的配置不习惯的话,你也可以使用外部的配置文件来使用
在启动类上使用@ImportResource注解来引入其他的xml配置文件


image.png

五、 SpringBoot配置多个RabbitMq
有时候由于业务的复杂性,需要配置多个RabbitMq
步骤
1、在application.properties中增加一个rabbitmq配置


image.png

2、在配置类中增加


image.png

3、新建一个生产者


image.png
4、新建一个消费者
image.png

5、测试类


image.png
上一篇下一篇

猜你喜欢

热点阅读