rabbitmq----rabbit相关概念及五种模式的实现

2019-05-21  本文已影响0人  不过意局bugyj

参考 引用 文章1

一、相关概念

信道(channel)

项目与rabbitmq之间的交流要建立在TCP连接的基础上,每次发布消息都要建立一次连接都要连接TCP无疑是对资源的一种浪费,所以rabbitmq加入了信道的概念,发消息的每一条线程都被包装成一条信道在tcp上传输!


队列(queue)

项目发布的消息都保存在队列中等待消费者来取,队列运行在服务器上,支持持久化和自动删除等功能,一条队列可以有多个消费者同时监听消息。


交换器(exchange)

交换器也是建立在服务器上,用于消息的分发,将具有routing-key的消息按一定规则发布到不同队列或者exchange上,exchange有五种:direct、fanout、topic、heades、default。(下面代码还有每一种的实现)

绑定(binding)

绑定可以理解成将队列与交换器连接起来的路由规则,交换器可以理解成绑定路由规则的集合。

消息(Message)

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

二、五种模式

先展示获得和关闭连接的工具类ConnectionUtil:

package basic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName: ConnectionUtil
 * @author: hsw
 * @date: 2019/5/18 19:33
 * @Description: TODO
 */
public class ConnectionUtil {
    public static Connection getConnection() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("***");
        factory.setPassword("***");
        try {
            return factory.newConnection();
        } catch (IOException e) {
            System.out.println("生成连接发生错误!");
            return null;
        } catch (TimeoutException e) {
            System.out.println("连接超时!");
            return null;
        }
    }

    public static void close(Connection connection, Channel channel) {
        try {
            channel.close();
        } catch (IOException e) {
            System.out.println("关闭信道错误!");
        } catch (TimeoutException e) {
            System.out.println("关闭信道超时!");
        }
        try {
            connection.close();
        } catch (IOException e) {
            System.out.println("关闭connection错误!");
        }
    }
}

1. 单消费者单生产者队列即:


使用默认交换器,可以理解成不存在

代码示例(注释齐全):

生产者:

package basic.simple;

import basic.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

/**
 * @ClassName: SimpleProducer
 * @author: hsw
 * @date: 2019/5/18 19:20
 * @Description: 基本步骤如下:

 * 3. 声明一个队列(一个生产者,一个队列,一个消费者,没有exchange)
 */
public class SimpleProducer {

    private static final String QUEUE_NAME = "hello_queue";
    private static final String MESSAGE = "HELLO CONSUMER!";

    public static void main(String[] args) throws IOException {
         // 1. 创建连接工厂,并设置其属性!
         // 2. 新建一个TCP连接,然后再其上新建一个信道!
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        // 3. 声明(创建)队列
        /*
         * 参数介绍: 
         * 参数1:队列名称
         * 参数2:durable(boolean): 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之
         *       后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库。
         * 参数3:exclusive : 是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;
         *        二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队
         *        列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,
         *        如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method:
     *            #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to
         *        locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景,
         *        然后这一个连接断开时直接删除此队列
         * 参数4:autoDelete: 是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除
         * 参数5:arguments:建立队列时的其他参数
         *           队列中的消息什么时候会自动被删除?
         *      Message TTL(x-message-ttl):设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 类似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过期时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
         *
         *      channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”));
         *
         *      Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp
         *
         *      Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉, 类似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim
         *
         *      Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B
         *
         *      Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX
         *
         *      Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK
         *
         *      Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费,
         *
         *      Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中
         *
         *      Master locator(x-queue-master-locator)
         */
        try {
            //千万记住,如果已经存在同名的队列,但是后面的参数不同就会报错IOException
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        } catch (IOException e) {
            System.out.println("声明队列发生错误!");
            return;
        }
        /*
         * 向server发布一条消息
         * 参数1:exchange name
         * 参数2:routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用
         * 参数3:mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。false:出现上述情形broker会直接将消息扔掉
         * 参数4:immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
         * 参数5:BasicProperties:一些基本属性,用到时再谷歌!
         * 参数6:body,消息体,消息内容!
         * RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,
         * 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃
         */
        Scanner scanner = new Scanner(System.in);
        while (true) {
            String message = scanner.nextLine();
            try {
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            } catch (IOException e) {
                System.out.println("向server发送消息发生错误!");
                break;
            }
        }
        scanner.close();
        ConnectionUtil.close(connection, channel);
    }
}

然后就是消费者

package basic.simple;

import basic.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: SimpleConsumer
 * @author: hsw
 * @date: 2019/5/18 20:13
 * @Description: TODO
 */
public class SimpleConsumer {

    private static final String QUEUE_NAME = "hello_queue";

    public static void main(String[] args) throws IOException {
        //创建连接和信道
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();

        // 声明队列(如果你已经明确的知道有这个队列,那么下面这句代码可以注释掉,如果不注释掉的话,也可以理解为消费者必须监听一个队列,如果没有就创建一个)
        assert channel != null;

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);


        while (true) {
            try {
                /*
                 * 监听队列
                 * 参数1: 队列名称
                 * 参数2:是否发送ack包,不发送ack消息会持续在服务端保存,直到收到ack。  可以通过channel.basicAck手动回复ack
                 * 参数3:消费者
                 */
                channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        // 获取消息路由键
                        System.out.println("routing key : " + envelope.getRoutingKey());
                        // 获取消息标签,随着消息的数目逐一递增
                        System.out.println("delivery tag : " + envelope.getDeliveryTag());
                        System.out.println("contentType : " + properties.getContentType());
                        System.out.println("content is : " + new String(body, "utf-8"));
                    }
                });
            } catch (IOException e) {
                System.out.println("消费发生错误!");
                break;
            }
        }

        ConnectionUtil.close(connection, channel);
    }
}

2. 单生产者,多消费者共享一条队列,即:


代码:
生产者:

package basic.work;

import basic.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @ClassName: WorkProducer
 * @author: hsw
 * @date: 2019/5/18 22:35
 * @Description: 即 生产者 → queue → 生产者
 *                                → 生产者
 */
public class WorkProducer {

    static final int MESSAGE_NUM = 10;
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.getConnection();
        if (connection == null) {
            return;
        }
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < MESSAGE_NUM; i++) {
            channel.basicPublish("", QUEUE_NAME, null, ("" + i).getBytes(StandardCharsets.UTF_8));
        }
        ConnectionUtil.close(connection, channel);
    }
}

消费者:

package basic.work;

import basic.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: WorkConsumer
 * @author: hsw
 * @date: 2019/5/18 22:35
 * @Description: sa
 */
public class WorkConsumer implements Runnable {
    public static void main(String[] args) {
        AtomicInteger i = new AtomicInteger();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 10000, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(), r -> new Thread(r, "thread-" + i.getAndIncrement()));

        pool.execute(new WorkConsumer());
        pool.execute(new WorkConsumer());

    }

    @Override
    public void run() {
        Connection connection = ConnectionUtil.getConnection();
        if (connection == null) {
            return;
        }
        Channel channel = null;
        try {
            channel = connection.createChannel();
            channel.queueDeclare(WorkProducer.QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);
            for (int i = 0; i < WorkProducer.MESSAGE_NUM / 2; i++) {
                Channel finalChannel = channel;
                channel.basicConsume(WorkProducer.QUEUE_NAME, false, new DefaultConsumer(finalChannel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println(Thread.currentThread().getName() + " get message : " + new String(body, StandardCharsets.UTF_8));

                        // 参数解释:应答的消息标签、是否批量应答(如果为true,则一次性应答所有tag前的消息!)
                        finalChannel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });
                Thread.sleep(100);
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            //如果autoAck设置成false的话,在connection关闭之前,消费者消费了多少消息,Unacked就等于多少!connection关闭后,Unacked变为0,Ready又变回原值!
            assert channel != null;
            ConnectionUtil.close(connection, channel);
            System.out.println(Thread.currentThread().getName() + " is over!");
        }
    }
}

3. 发布订阅模式

使用多种exchange,按照相应的规则与路由键将消息发布到不同的绑定到这个交换器的队列上,然后监听队列的订阅者消费消息!
按照fanout、direct(headers差不多,就写direct吧)和topic三种路由规则实现代码:
fanout交换器是将消息发送到所有绑定到此交换器的队列上。代码略
direct(headers)代码示例:
publisher:消息发布到交换器中即可,不用关心队列!

package basic.publish;

import basic.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @ClassName: Publisher
 * @author: hsw
 * @date: 2019/5/19 15:33
 * @Description: 发布者 发布者 → fanout_exchange → queue → 订阅者1
 *                                       → queue → 订阅者2
 */
public class Publisher {
    static String EXCHANGE_NAME = "HELLO_EXCHANGE";
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", false);
        //fanout交换器,所以routingKey为空字符串!
        channel.basicPublish(EXCHANGE_NAME, "", false, false, null, "first!".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(EXCHANGE_NAME, "", false, false, null, "second!".getBytes(StandardCharsets.UTF_8));
        ConnectionUtil.close(connection, channel);
    }
}

subscriber:订阅者,完成队列的声明和队列与交换器的绑定!

package basic.publish;

import basic.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: Subscriber
 * @author: hsw
 * @date: 2019/5/19 15:34
 * @Description: 订阅者
 */
public class Subscriber implements Callable<String> {

    private static String QUEUE_NAME = "SUBSCRIBER_QUEUE";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        AtomicInteger i = new AtomicInteger();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 10000, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(), (r) -> new Thread(r, "Thread-" + i.getAndIncrement()));
        Future<String> f1 = pool.submit(new Subscriber());
        Future<String> f2  = pool.submit(new Subscriber());
        System.out.println("f1 " + f1.get());
        System.out.println("f2 " + f2.get());
    }

    @Override
    public String call() throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(Publisher.EXCHANGE_NAME, "fanout");

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, Publisher.EXCHANGE_NAME, "");
        channel.basicQos(1);
        for (int i = 0; i < 2; i++) {
            channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("content in " + Thread.currentThread().getName() + " is " + new String(body, StandardCharsets.UTF_8));
                    // 第一个参数表示的是消息的标签,消息队列没发出一个消息,标签值加一。
                    // 第二个参数表示是否批量回复,为true时,表现前面的所有消息都被ACK!
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });
        }
        Thread.sleep(5000);
        ConnectionUtil.close(connection, channel);
        return "successful";
    }
}

4. 然后就是topic,也基本属于发布订阅模式,分开讲

通配符路由消息,即*代表一个词,#代表多个词,比如:#.c能匹配到a112.b112.c、d12.e1231.c和a.c等。而*.*.c只能匹配到前两个!后面一个最后一个匹配不到。
代码:
producer:同样发到交换器中即可,记住指定routingKey!

package basic.topic;

import basic.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @ClassName: TopicProducer
 * @author: hsw
 * @date: 2019/5/20 10:30
 * @Description: TODO
 */
public class TopicProducer {
    static String TOPIC_EXCHANGE_NAME = "topic_exchange";
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, "topic", true);

        channel.basicPublish(TOPIC_EXCHANGE_NAME, "selectQueue.middleQueue.a", false, false, null, "select.middle.a Info".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(TOPIC_EXCHANGE_NAME, "selectQueue.left.cQueue", false, false, null, "select.left.c Info".getBytes(StandardCharsets.UTF_8));
        channel.basicPublish(TOPIC_EXCHANGE_NAME, "insert.middleQueue.cQueue", false, false, null, "insert.middle.c Info".getBytes(StandardCharsets.UTF_8));
        ConnectionUtil.close(connection, channel);
    }
}

Consumer:消费者,声明消息队列和绑定已经路由键!

package basic.topic;

import basic.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @ClassName: TopicConsumer
 * @author: hsw
 * @date: 2019/5/20 10:46
 * @Description: TODO
 */
@SuppressWarnings("all")
public class TopicConsumer implements Runnable {

    public static void main(String[] args) {
        //因为这里要设置线程的名字,而threadPoolExecutor线程的命名写死了!
        new Thread(new TopicConsumer(), "middleQueue").start();
        new Thread(new TopicConsumer(), "cQueue").start();
        new Thread(new TopicConsumer(), "selectQueue").start();
    }

    @Override
    public void run() {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = null;
        try {
            assert connection != null;
            channel = connection.createChannel();
        } catch (IOException e) {
            System.out.println("channel创建失败!");
            try {
                connection.close();
            } catch (IOException ex) {
                System.out.println("connection关闭发生错误!");
            }
            return;
        }
        assert channel != null;
        String queueName = Thread.currentThread().getName();
        try {
            channel.exchangeDeclare(TopicProducer.TOPIC_EXCHANGE_NAME, "topic", true);
            channel.queueDeclare(queueName, false, false, false, null);
            if ("cQueue".equals(queueName)) {
                channel.queueBind(queueName, TopicProducer.TOPIC_EXCHANGE_NAME, "*.*." + queueName);
            } else if ("selectQueue".equals(queueName)) {
                channel.queueBind(queueName, TopicProducer.TOPIC_EXCHANGE_NAME, queueName + ".*.*");
            } else {
                channel.queueBind(queueName, TopicProducer.TOPIC_EXCHANGE_NAME, "*." + queueName + ".*");
            }
        } catch (IOException e) {
            System.out.println("交换器或队列声明失败!");
            return;
        }

        try {
            while (true) {
                channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("queue name is " + queueName + " content is " + new String(body, StandardCharsets.UTF_8));
                    }
                });
            }
        } catch (IOException e) {
            System.out.println("读取消息错误!");
        }finally {
            ConnectionUtil.close(connection, channel);
        }
    }
}

到这里虽然只写了四种,但代码基本都差不多,所以像下面的4省略掉了,它和5就只有路由规则的细微差别,所以我只写了五。而6触及了我的知识盲区,先不写!!!


三、 spring整合rabbitmq

导入相关依赖:
        <!-- spring-rabbit -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>

        <!-- amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.0</version>
        </dependency>

        <!-- spring-amqp -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-amqp</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>

        <!-- spring-context -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.6.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>5.1.6.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbitmq="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--  消息队列的监听类,下面代码实现  -->
    <bean id="myMessageListener" class="spring.MyMessageListener"/>

    <!--  简历连接的工厂类  -->
    <rabbitmq:connection-factory id="connectionFactory" host="127.0.0.1" port="5672" username="***"
                                 password="***"/>

    <!--  当服务器上已经存在相关的交换器和消息队列时,这个可以不用配置。没有的话就需要配置这个新建队列和交换器等  -->
    <rabbitmq:admin connection-factory="connectionFactory" id="admin"/>

    <!--  类似于jdbcTemplate,可以使用其想队列或交换器发送消息!  -->
    <!--  这个万分记住喽,exchange不像别的东西在配置文件中填写的是对象的id,这里是交换器名字  -->
    <rabbitmq:template connection-factory="connectionFactory" id="rabbitTemplate" exchange="exchange_name"/>

    <!--  声明队列  -->
    <rabbitmq:queue id="queue_id" name="queue_name"/>

    <!--  声明交换器  -->
    <rabbitmq:direct-exchange name="exchange_name" id="exchange_id">
        <rabbitmq:bindings>
            <rabbitmq:binding queue="queue_id" key="queue_key"/>
        </rabbitmq:bindings>
    </rabbitmq:direct-exchange>

    <!--  声明监听器容器,在里面配置好监听器具体监听那个队列  -->
    <rabbitmq:listener-container connection-factory="connectionFactory">
        <rabbitmq:listener ref="myMessageListener" queues="queue_id"/>
    </rabbitmq:listener-container>
</beans>
编写监听类,实现MessageListener即可:
package spring;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;

/**
 * @ClassName: MessageListener
 * @author: hsw
 * @date: 2019/5/21 11:41
 * @Description: 监听队列消息
 */
public class MyMessageListener implements MessageListener {
    /**
     * 所监听的消息队列一有消息就会触发此方法的调用!
     * @param message 作为消息内容及其属性的封装!
     */
    @Override
    public void onMessage(Message message) {
        System.out.println("消息内容是:" + new String(message.getBody(), StandardCharsets.UTF_8));
        System.out.println("消息标签为:" + message.getMessageProperties().getDeliveryTag());
    }
}
编写个测试类想上面类所监听的消息队列中发送一个简单消息:
package spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * @ClassName: RabbitSenderTest
 * @author: hsw
 * @date: 2019/5/21 11:47
 * @Description: TODO
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("spring-mq.xml")
public class RabbitSenderTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMessage() {
        // 发送一个简单的消息,routingKey是“queue_key”
        rabbitTemplate.convertAndSend("queue_key", "hello spring rabbitmq");
    }
}

自此就实现了一个简单的spring rabbitmq的整合。

以上!

上一篇下一篇

猜你喜欢

热点阅读