开源项目

RabbitMQ——基础知识与模拟体验

2018-10-16  本文已影响11人  莫问以

1、什么是RabbitMQ?
前面讲过一篇文件,关于ActiveMQ的,可以对照看一下,链接:https://www.jianshu.com/p/187d5c2a898d
那么,RabbitMQ又是什么呢?
RabbitMQ 是一个消息代理:它接收并转发消息。你可以将其视为邮局:当你将要发布的邮件放在邮箱中时,您可以确信 Postman 先生最终会将邮件发送给收件人。在这个比喻中,RabbitMQ 是一个邮箱,邮局和邮递员。
RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块的消息。

常见术语
1)生产者:一个发送消息的程序是一个生产者。
2)队列:队列类似于邮箱。虽然消息通过 RabbitMQ 在你的应用中传递,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制的限制,它本质上是一个大的消息缓冲区。不同的生产者可以通过同一个队列发送消息,此外,不同的消费者也可以从同一个队列上接收消息。
3)消费者:一个等待接收消息的程序是一个消费者。


rabbitmq.png

上图所示:“P”是我们的生产者,“C”是我们的消费者。中间的框是队列 - RabbitMQ 代表消费者的消息缓冲区。

整个过程非常简单,生产者创建消息,消费者接收这些消息。你的应用程序既可以作为生产者向其他应用程序发送消息,也可以作为消费者,等待接收其他应用程序的消息。其中,存储消息的是消息队列,它类似于邮箱,消息通过消息队列进行投递。

2、下载与安装(windows环境)
参考链接:https://blog.csdn.net/weixin_39735923/article/details/79288578

3、初步体验
引入依赖:

<!-- 引入RabbitMQ 依赖 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.3</version>
        </dependency>

创建生产者,生产者连接到 RabbitMQ,发送一条数据:

package com.guxf.demo.rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接 
        Connection connection = factory.newConnection();
        // 创建一个通道 
        Channel channel = connection.createChannel();    
        // 指定一个队列
        // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        // 参数1 queue :队列名
        // 参数2 durable :是否持久化
        // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
        // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
        // 参数5 arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 发送消息
        String message = "Hello World!";
        // basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
        // 参数1 exchange :交换器
        // 参数2 routingKey : 路由键
        // 参数3 props : 消息的其他参数
        // 参数4 body : 消息体
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("生产者发送的消息是[" + message + "]");
        // 关闭频道和连接  
        channel.close();
        connection.close();
    }
}

消息接收者:

package com.guxf.demo.rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Receive {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        factory.setHost("localhost");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("我在等待接收消息——");
        // 创建队列消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消费者收到的消息是[" + message + "]");
            }
        };
        // basicConsume(String queue, boolean autoAck, Consumer callback)
        // 参数1 queue :队列名
        // 参数2 autoAck : 是否自动ACK
        // 参数3 callback : 消费者对象的一个接口,用来配置回调
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

先运行Send,发出消息,再运行Receive接收消息,即可收到HelloWord,而且可以看到队列名(前提是得启动RabbitMQ):


看到了hello队列.png

4、任务队列
1)任务队列:饭店高峰期时,顾客单子不得不按照下单顺序一个个放在厨房,进行先后炒菜处理,这一堆的单子就是任务队列。
2)消息队列:消息队列(MQ)可以理解成两个应用程序间(生产者消费者间)的通信,例如短信发送模块,因为模块的发送速度跟不上,这时候需要有一个容器,暂存一下,缓解下压力,那么“消息队列”就是在消息的传输过程中保存消息的“容器”。然后短信模块就可以淡定的去消息队列取出要发出的短信内容,进行发送处理。
创建多消息的发送端:

package com.guxf.demo.rabbit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class NewTask {
     private final static String QUEUE_NAME = "ningmo";
     
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接
            ConnectionFactory factory = new ConnectionFactory();
            // 设置 RabbitMQ 的主机名
            factory.setHost("localhost");
            // 创建一个连接 
            Connection connection = factory.newConnection();
            // 创建一个通道 
            Channel channel = connection.createChannel();    
            // 指定一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 发送消息 
            for (int i = 0; i < 10; i++) {  
                String message = "Task:" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
                System.out.println("发送的消息为[" + message + "]");  
            }  
            // 关闭频道和连接  
            channel.close();
            connection.close();
        }
}

接收消息:

package com.guxf.demo.rabbit;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Worker {
    private final static String QUEUE_NAME = "ningmo";
     
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 指定一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 创建队列消费者
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
 
                System.out.println("收到的消息为[" + message + "]");
                try {
                    doWork(message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } 
            }
        };
        // acknowledgment is covered below
        boolean autoAck = true; 
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
 
    private static void doWork(String task) throws InterruptedException {
        String[] taskArr = task.split(":");
        TimeUnit.SECONDS.sleep(Long.valueOf(taskArr[1]));
    }
}
上一篇下一篇

猜你喜欢

热点阅读