Rabbitmq使用方法及理解(一)

2020-05-24  本文已影响0人  还我爆漫

rabbitmq版本3.8.3 erlang版本22.2

image-20200524115051864.png

rabbitmq可以通过命令行的方式,也可通过可视化页面的方式进行操作。

命令行中输入

rabbitmqctl help
[root@s ~]# rabbitmqctl help

Usage

rabbitmqctl [--node <node>] [--timeout <timeout>] [--longnames] [--quiet] <command> [<command options>]

Available commands:

Help:

   help                          Displays usage information for a command
   version                       Displays CLI tools version

...................

可以查看rabbitmq的各种命令行操作,喜欢撸代码的小伙伴可以自行尝试。

这里主要结合idea,springboot,rabbitmq可视化控制页面讲解rabbitmq的使用。

还没安装rabbitmq的小伙伴可以去看:Linux_CentOS7 安装配置Rabbitmq3.8.3填坑之旅

使用springboot java操作rabbitmq

1)新建springboot项目

QQ截图20200524144037.png

因为是学习测试用的所以勾选一个spring web就好

QQ截图20200524144748.png

项目创建好后在pom.xml(Maven的配置文件)中添加rabbitmq依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>

2)启动配置rabbitmq

启动操作页面插件

rabbitmq-plugins enable rabbitmq_management

启动rabbitmq

systemctl start rabbitmq-server

启动Linux外部访问端口

/sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT

创建虚拟机

QQ截图20200524161556.png

创建用户

QQ截图20200524162122.png

用户绑定虚拟机

QQ截图20200524162330.png QQ截图20200524162435.png

前期工作完成开始上代码

3)rabbitmq第一种消息模型测试

QQ截图20200524150318.png

该种模型只有“生产者【p】”,“队列”,“消费者【c】”。是最简单的消息队列通信。注意:这里的消息指的是数据说具体点是服务器端通信的数据。虽然使用RabbitMQ Web STOMP可以使web前端作为消息的消费者(特定情况下,消息推送时使用),但是多数情况下消息还是在服务器端流转。消息以byte类型传输,生产和消费时可根据业务情况使用字符串或json格式数据。

以下是测试生产与消费的代码

package com.example.test.btest;

import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@SpringBootTest
class BtestApplicationTests {
    //获取springboot默认日志工具
    Logger logger = LoggerFactory.getLogger(getClass());
    
    
    //创建sender测试方法,用于向rabbitmq发送消息,即生产者生产消息
    @Test
    void sender() throws IOException, TimeoutException {
        //创建rabbitmq连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //输入rabbitmq服务器IP
        connectionFactory.setHost("192.168.31.230");
        /**
        *4369 -- erlang发现口
        *5672 --client端通信口
        *15672 -- 管理界面ui端口
        *25672 -- server间内部通信口
        *输入客户(外部)通讯端口
        */
        connectionFactory.setPort(5672);
         //输入要连接的虚拟主机
        connectionFactory.setVirtualHost("/test_one");
        //操作用户
        connectionFactory.setUsername("test_user");
        //操作密码
        connectionFactory.setPassword("123");
        //生成连接
        Connection connection = connectionFactory.newConnection();
        //生成通信管道
        Channel channel = connection.createChannel();
        //声明(创建)消息队列
        /**
        *参数1: queue 队列的名称
        *参数2: durable 是否持久化 
        *参数3: exclusive 是否排外的(独立队列)
        *参数4: autoDelete: 是否自动删除
        *参数5: arguments: 设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key         *、 x-rnax-priority 等
        *当durable = false时,队列非持久化。因为队列是存放在内存中的,所以当RabbitMQ重启或者服务器重        * 启时该队列就会丢失 ;
        *当durable = true时,队列持久化。当RabbitMQ重启后队列不会丢失。RabbitMQ退出时它会将队列信息        *保存到 Erlang自带的Mnesia数据库 中,当RabbitMQ重启之后会读取该数据库
        *当exclusive = true则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连         *接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时        *自动删除 ;当exclusive = false则设置队列为非排他的,此时不同连接(Connection)的管道           *Channel可以使用该队列
        *如果autoDelete = true,当所有消费者都与这个队列断开连接时,即消费完成消费者调用                   *channel.close()  connection.close()关闭连接时删除此列队
        *
        *参考:https://blog.csdn.net/AwayFuture/article/details/103405335
        */
        channel.queueDeclare("hello",false,false,false,null);
        //发送消息
        /**
        *参数1: exchange 交换机名称
        *参数2: queue 队列名称
        *参数3: basicProperties 附加参数
        *参数4: 消息数据
        */
        channel.basicPublish("","hello",null,"你好世界".getBytes());
        //关闭通道
        channel.close();
        //关闭连接
        connection.close();

    }
    
    
    //创建customer测试方法,用于rabbitmq处理消息,即消费者消费消息
    @Test
    void customer() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.31.230");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test_one");
        connectionFactory.setUsername("test_user");
        connectionFactory.setPassword("123");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello",false,false,false,null);
        /**
        *参数1:队列名称
        *参数2:autoAck 是否自动确认消息,true自动确认,false手动调用确认。
        *参数3:回调函数获取消费数据
        */
        channel.basicConsume("hello",true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                super.handleDelivery(consumerTag, envelope, properties, body);

                logger.warn("this body ==>" + new String(body));
            }
        });

    }

}
上一篇下一篇

猜你喜欢

热点阅读