工具推荐

RabbitMQ入门使用

2020-03-01  本文已影响0人  依弗布德甘

RabbitMQ

RabbitMQ模型架构

大部分操作都是在Channel这个接口中完成的,包括定义队列的声明queueDeclare、交换机的声明exchangeDeclare、队列的绑定queueBind、发布消息basicPublish、消费消息basicConsume等

RabbitMQ运转流程

业务运转流程 架构运转流程

RabbitMQ 安装和使用

一、安装依赖环境

  1. http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本

  2. https://github.com/rabbitmq/erlang-rpm/releases 页面找到需要下载的erlang版本,erlang-*.centos.x86_64.rpm就是centos版本的。

  3. 复制下载地址后,使用wget命令下载

    wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
    
  4. 安装 Erlang

    sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
    
  5. 安装 socat

    sudo yum install -y socat
    

二、安装RabbitMQ

  1. 官方下载页面找到CentOS7版本的下载链接,下载rpm安装包

    wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm
    

    提示:可以在https://github.com/rabbitmq/rabbitmq-server/tags下载历史版本

  2. 安装RabbitMQ

    sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm
    

三、启动和关闭

四、开启Web管理插件

  1. 开启插件

    rabbitmq-plugins enable rabbitmq_management
    

    说明:rabbitmq有一个默认的guest用户,但只能通过localhost访问,所以需要添加一个能够远程访问的用户。

  2. 添加用户

    rabbitmqctl add_user admin admin
    
  3. 为用户分配操作权限

    rabbitmqctl set_user_tags admin administrator
    
  4. 为用户分配资源权限

    rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
    

五、防火墙添加端口

  1. 添加端口

    sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
    
  2. 重启防火墙

    sudo firewall-cmd --reload
    

多机多节点集群部署

一、 环境准备

二、修改配置文件

  1. 修改10.10.1.41机器上的/etc/hosts文件

    sudo vim /etc/hosts
    
  2. 添加IP和节点名

    10.10.1.41 node1
    10.10.1.42 node2
    10.10.1.43 node3
    
  3. 修改对应主机的hostname

hostname node1
hostname node2
hostname node3
  1. 10.10.1.41上的hosts文件复制到另外两台机器上
    sudo scp /etc/hosts root@node2:/etc/
    sudo scp /etc/hosts root@node3:/etc/
    
    说明:命令中的root是目标机器的用户名,命令执行后,可能会提示需要输入密码,输入对应用户的密码就行了
  2. 10.10.1.41上的/var/lib/rabbitmq/.erlang.cookie文件复制到另外两台机器上
    scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/
    scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/
    
    提示:如果是通过克隆的VM,可以省略这一步

三、防火墙添加端口

  1. 添加端口

    sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
    
  2. 重启防火墙

    sudo firewall-cmd --reload
    

四、启动RabbitMQ

  1. 启动每台机器的RabbitMQ

    sudo systemctl start rabbitmq-server
    

    或者

    rabbitmq-server -detached
    
  2. 10.10.1.42加入到集群

    # 停止RabbitMQ 应用
    rabbitmqctl stop_app
    # 重置RabbitMQ 设置
    rabbitmqctl reset
    # 加入到集群
    rabbitmqctl join_cluster rabbit@node1 --ram
    # 启动RabbitMQ 应用
    rabbitmqctl start_app
    
  3. 查看集群状态,看到running_nodes,[rabbit@node1,rabbit@node2]表示节点启动成功

    rabbitmqctl cluster_status
    

    提示:在管理界面可以更直观的看到集群信息

  4. 10.10.1.43加入到集群

    # 停止 RabbitMQ 应用
    rabbitmqctl stop_app
    # 重置 RabbitMQ 设置
    rabbitmqctl reset
    # 节点加入到集群
    rabbitmqctl join_cluster rabbit@node1 --ram
    # 启动 RabbitMQ 应用
    rabbitmqctl start_app
    
  5. 重复地3步,查看集群状态


单机多节点部署

一、环境准备

二、启动RabbitMQ

  1. 在启动前,先修改RabbitMQ 的默认节点名(非必要),在/etc/rabbitmq/rabbitmq-env.conf增加以下内容

    # RabbitMQ 默认节点名,默认是rabbit
    NODENAME=rabbit1
    
  2. RabbitMQ 默认是使用服务的启动的,单机多节点时需要改为手动启动,先停止运行中的RabbitMQ 服务

    sudo systemctl stop rabbitmq-server
    
  3. 启动第一个节点

    rabbitmq-server -detached
    
  4. 启动第二个节点

    RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server -detached
    
  5. 启动第三个节点

    RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit3 rabbitmq-server -detached
    
  6. 将rabbit2加入到集群

    # 停止 rabbit2 的应用
    rabbitmqctl -n rabbit2 stop_app
    # 重置 rabbit2 的设置
    rabbitmqctl -n rabbit2 reset
    # rabbit2 节点加入到 rabbit1的集群中
    rabbitmqctl -n rabbit2 join_cluster rabbit1 --ram
    # 启动 rabbit2 节点
    rabbitmqctl -n rabbit2 start_app
    
  7. 将rabbit3加入到集群

    # 停止 rabbit3 的应用
    rabbitmqctl -n rabbit3 stop_app
    # 重置 rabbit3 的设置
    rabbitmqctl -n rabbit3 reset
    # rabbit3 节点加入到 rabbit1的集群中
    rabbitmqctl -n rabbit3 join_cluster rabbit1 --ram
    # 启动 rabbit3 节点
    rabbitmqctl -n rabbit3 start_app
    
  8. 查看集群状态,看到{running_nodes,[rabbit3@node1,rabbit2@node1,rabbit1@node1]}说明节点已启动成功。

    rabbitmqctl cluster_status
    

    提示:在管理界面可以更直观的看到集群信息

三、防火墙添加端口

  1. 添加端口

    sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5673/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25673/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15673/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=5674/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=25674/tcp --permanent
    sudo firewall-cmd --zone=public --add-port=15674/tcp --permanent
    
  2. 重启防火墙

    sudo firewall-cmd --reload
    

镜像队列模式集群


简单代码示例

JAVA依赖

  <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>5.5.1</version>
  </dependency>

Spring中依赖

  <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
       <version>2.1.1.RELEASE</version>
  </dependency>
  <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
       <version>2.1.1.RELEASE</version>
  </dependency>

生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 简单队列生产者
 * 使用RabbitMQ的默认交换器发送消息
 */
public class Producer {

    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接属性
        factory.setHost("192.168.100.242");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3、从连接工厂获取连接
            connection = factory.newConnection("生产者");

            // 4、从链接中创建通道
            channel = connection.createChannel();

            /**
             * 5、声明(创建)队列
             * 如果队列不存在,才会创建
             * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
             *
             * queueDeclare参数说明:
             * @param queue 队列名称
             * @param durable 队列是否持久化
             * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制
             * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
             * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
             */
            channel.queueDeclare("queue1", false, false, false, null);

            // 消息内容
            String message = "Hello World!";
            // 6、发送消息
            channel.basicPublish("", "queue1", null, message.getBytes());
            System.out.println("消息已发送!");

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 7、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 8、关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 简单队列消费者
 */
public class Consumer {

    public static void main(String[] args) {
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置连接属性
        factory.setHost("192.168.100.242");
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3、从连接工厂获取连接
            connection = factory.newConnection("消费者");

            // 4、从链接中创建通道
            channel = connection.createChannel();

            /**
             * 5、声明(创建)队列
             * 如果队列不存在,才会创建
             * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
             *
             * queueDeclare参数说明:
             * @param queue 队列名称
             * @param durable 队列是否持久化
             * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,
             *                  并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。
             *                  一般在队列和交换器绑定时使用
             * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
             * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
             */
            channel.queueDeclare("queue1", false, false, false, null);

            // 6、定义收到消息后的回调
            DeliverCallback callback = new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
                }
            };
            // 7、监听队列
            channel.basicConsume("queue1", true, callback, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                }
            });

            System.out.println("开始接收消息");
            System.in.read();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 8、关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 9、关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Spring中使用RabbitMQ

创建队列
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfiguration {

    @Bean
    public Queue helloSpring() {
        return new Queue("spring.cluster");
    }
}

生产者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@EnableAutoConfiguration
@Import(AppConfiguration.class)
public class ProducerApp {

    private static final Logger logger = LoggerFactory.getLogger(ProducerApp.class);

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private Queue helloSpring;

    @Bean
    CommandLineRunner runner() {
        return args -> {
            template.convertAndSend(helloSpring.getName(), "Hello Spring");
            logger.info("消息已发送");
        };
    }

    public static void main(String[] args) {
        SpringApplication.run(ProducerApp.class, args);
    }

}

消费者
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.messaging.handler.annotation.Payload;

@Configuration
@EnableAutoConfiguration
@RabbitListener(queues = "spring.cluster")
@Import(AppConfiguration.class)
public class ConsumerApp {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerApp.class);

    @RabbitHandler
    public void receive(@Payload String msg) {
        logger.info("收到消息:" + msg);
    }

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApp.class, args);
    }
}

上一篇 下一篇

猜你喜欢

热点阅读