快速入门RabbitMQ核心概念

2020-11-23  本文已影响0人  端碗吹水

哪些互联网大厂在使用RabbitMQ,为什么?

初识RabbitMQ:

哪些互联网大厂在使用RabbitMQ:

为什么使用RabbitMQ:


RabbitMQ高性能的原因


AMQP高级消息队列协议与模型

什么是AMQP高级消息队列协议:

AMQP协议模型:


image.png

AMQP核心概念


RabbitMQ整体架构与消息流转

RabbitMQ整体架构图:


image.png

RabbitMQ消息流转图:


image.png

RabbitMQ环境安装

官方下载地址:

我们知道RabbitMQ是基于Erlang编写的,所以在安装RabbitMQ之前需要确保安装了Erlang环境。RabbitMQ与Erlang是有版本对应关系的,可以参考官方列举的版本对应关系:

例如,我这里要安装3.8.9版本的RabbitMQ,那么按官方的说明,我需要安装 22.3 ~ 23.x 版本的Erlang环境,我这里选择23.1.3版本的Erlang。使用如下命令下载RPM安装包:

[root@rabbitmq01 ~]# cd /usr/local/src
[root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/erlang-rpm/releases/download/v23.1.3/erlang-23.1.3-1.el7.x86_64.rpm
[root@rabbitmq01 /usr/local/src]# wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.9/rabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@rabbitmq01 /usr/local/src]# ls
erlang-23.1.3-1.el7.x86_64.rpm  rabbitmq-server-3.8.9-1.el7.noarch.rpm
[root@rabbitmq01 /usr/local/src]# 

使用yum命令进行安装,因为yum可自动解决依赖关系:

[root@rabbitmq01 /usr/local/src]# yum install -y erlang-23.1.3-1.el7.x86_64.rpm
[root@rabbitmq01 /usr/local/src]# yum install -y rabbitmq-server-3.8.9-1.el7.noarch.rpm

RabbitMQ新版本没有提供配置文件的示例,需要自己去Github上下载:

将配置文件放到/etc/rabbitmq目录下:

[root@rabbitmq01 /usr/local/src]# mv rabbitmq.conf.example /etc/rabbitmq/rabbitmq.conf

修改配置文件:

[root@rabbitmq01 ~]# vim /etc/rabbitmq/rabbitmq.conf
# 允许默认用户被外部网络访问
loopback_users.guest = false

完成配置后,启动RabbitMQ Server:

[root@rabbitmq01 ~]# rabbitmq-server start &

检查端口是否正常监听,5672是RabbitMQ的默认端口号:

[root@rabbitmq01 ~]# netstat -lntp |grep 5672
tcp        0      0 0.0.0.0:25672           0.0.0.0:*           LISTEN      1922/beam.smp       
tcp6       0      0 :::5672                 :::*                LISTEN      1922/beam.smp       
[root@rabbitmq01 ~]# 

启用RabbitMQ的管控台插件,我们可以在管控台中查看RabbitMQ的基础监控信息,以及对RabbitMQ进行管理:

[root@rabbitmq01 ~]# rabbitmq-plugins enable rabbitmq_management

使用浏览器访问管控台的15672端口,进入到登录界面,默认用户名密码均为guest

image.png

登录成功,进入到管控台首页:


image.png

rabbitmqctl命令行操作

rabbitmqctl基础操作命令:

# 关闭应用
rabbitmqctl stop_app

# 启动应用
rabbitmqctl start_app

# 节点状态
rabbitmqctl status

# 添加用户
rabbitmqctl add user username password

# 列出所有用户
rabbitmqctl list users

# 删除用户
rabbitmqctl delete_user username

# 清除用户权限
rabbitmqctl clear_permissions -p vhostpath username

# 列出用户权限
rabbitmqctl list_user_permissions username

# 修改密码
rabbitmqctl change_password username newpassword

# 设置用户权限
rabbitmqctl set permissions -p vhostpath username ".*" ".*" ".*"

# 创建虚拟主机
rabbitmqctl add vhost vhostpath

# 列出所有虚拟主机
rabbitmqctl list_vhosts

# 列出虚拟主机上所有权限
rabbitmqctl list_permissions -p vhostpath

# 删除虚拟主机
rabbitmqctl delete_vhost vhostpath

# 查看所有队列信息
rabbitmqctl list_queues

# 清除队列里的消息
rabbitmqctl -p vhostpath purge_queue blue

rabbitmqctl高级操作命令:

# 移除所有数据,要在rabbitmqctIl stop_app之后使用
rabbitmqctl reset

# 组成集群命令
rabbitmqctl join_cluster <clusternode> [--ram]

# 查看集群状态
rabbitmqctl cluster_status

# 修改集群节点的存储形式
rabbitmqctl change_cluster_node_type disc | ram

# 忘记节点(摘除节点)
rabbitmqctl forget cluster_node [--offline]

# 修改节点名称
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2..] 

生产者消费者代码示例

创建一个Maven工程,在pom文件中添加如下依赖:

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.16</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

生产者代码示例:

package com.zj.rabbitmq.learn.basic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

/**
 * 生产者
 *
 * @author 01
 * @date 2020-11-23
 **/
public class MyProducer {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            for (int i = 0; i < 5; i++) {
                // 通过Channel发送数据
                String msg = "Hello RabbitMQ!";
                // 不设置Exchange默认走default direct exchange,此时routingKey就是队列名称
                channel.basicPublish("", "test001", null, msg.getBytes());
            }
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.basic;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

/**
 * 消费者
 *
 * @author 01
 * @date 2020-11-23
 **/
public class MyConsumer {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare("test001", true, false, false, null);
            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true){
                channel.basicConsume("test001", true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

先运行消费者,再运行生产者,此时消费者控制台输出如下:


image.png

关于交换机

Exchange(交换机)用于接收消息,并根据路由键转发消息所绑定的队列:


image.png

交换机属性:

Direct Exchange

image.png

生产者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

public class ProducerOfDirectExchange {
    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            // 通过Channel发送数据
            String msg = "Hello RabbitMQ of Direct Exchange!";
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

public class ConsumerOfDirectExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {

            String exchangeName = "test_direct_exchange";
            String exchangeType = "direct";
            String queueName = "test_direct_queue";
            String routingKey = "test.direct";

            // 声明一个direct类型的Exchange
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare(queueName, true, false, false, null);
            // 将队列绑定到指定的Exchange上
            channel.queueBind(queueName, exchangeName, routingKey);

            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true) {
                channel.basicConsume(queueName, true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

Topic Exchange

image.png

生产者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

public class ProducerOfTopicExchange {
    
    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.abc";

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            // 通过Channel发送数据
            String msg = "Hello RabbitMQ of Topic Exchange!";
            channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
            channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
            channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

public class ConsumerOfTopicExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {

            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
            //String routingKey = "user.*";
            String routingKey = "user.#";

            // 声明一个topic类型的Exchange
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare(queueName, true, false, false, null);
            // 将队列绑定到指定的Exchange上
            channel.queueBind(queueName, exchangeName, routingKey);

            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true) {
                channel.basicConsume(queueName, true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

Fanout Exchange

image.png

生产者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

public class ProducerOfFanoutExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        String exchangeName = "test_fanout_exchange";

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            for (int i = 0; i < 10; i++) {
                // 通过Channel发送数据
                String msg = "Hello RabbitMQ of Fanout Exchange!";
                channel.basicPublish(exchangeName, "", null, msg.getBytes());
            }
        }
    }
}

消费者代码示例:

package com.zj.rabbitmq.learn.exchange;

import com.rabbitmq.client.*;
import lombok.SneakyThrows;

public class ConsumerOfFanoutExchange {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {

            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            // 不设置routingKey
            String routingKey = "";

            // 声明一个fanout类型的Exchange
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 声明一个队列,队列不存在会自动创建
            channel.queueDeclare(queueName, true, false, false, null);
            // 将队列绑定到指定的Exchange上
            channel.queueBind(queueName, exchangeName, routingKey);

            // 创建消费者
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) {
                    String message = new String(body);
                    System.out.println("Received: " + message);
                }
            };

            // 持续监听,消费消息
            while (true) {
                channel.basicConsume(queueName, true, consumer);
                Thread.sleep(1000);
            }
        }
    }
}

绑定、队列、消息、虚拟主机

Binding - 绑定:

Queue - 消息队列:

Message - 消息:

设置Message属性代码示例:

package com.zj.rabbitmq.learn.message;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.SneakyThrows;

import java.util.HashMap;
import java.util.Map;

class MyProducer {

    @SneakyThrows
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.243.164");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Map<String, Object> headers = new HashMap<>();
        headers.put("a", "1");
        headers.put("b", "2");
        // 自定义Message的一些属性
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                // 持久化模式
                .deliveryMode(2)
                // 消息的编码格式
                .contentEncoding("UTF-8")
                // 消息过期时间
                .expiration("15000")
                // 设置消息的头
                .headers(headers)
                .build();

        // 通过连接工厂创建连接
        try (Connection connection = factory.newConnection();
             // 通过连接创建一个Channel
             Channel channel = connection.createChannel()) {
            for (int i = 0; i < 5; i++) {
                // 通过Channel发送数据
                String msg = "Hello RabbitMQ!";
                // 不设置Exchange默认走direct exchange,此时routingKey就是队列名称
                channel.basicPublish("", "test001", properties, msg.getBytes());
            }
        }
    }
}

Virtual host - 虚拟主机:

上一篇下一篇

猜你喜欢

热点阅读