rabbitMq-简单模式-java

2019-04-10  本文已影响0人  ssttIsme

简单模式



P:代表生产者,向队列中发送消息。
队列:保存生产者提供的数据。(队列的特点,先进先出)
C:代表消费者,负责将对列中的消息全部读取,并且完成特定的任务。
工作原理:生产者源源不断地向队列中发送消息。消费者只能从一个队列中读取消息,并处理。

[root@bogon rabbitmq-server-3.6.1]# cd /etc/rabbitmq

[root@bogon rabbitmq]# service rabbitmq-server start
Starting rabbitmq-server: SUCCESS
rabbitmq-server.er.

新增用户名密码,并设定管理员权限



本次密码为guest
点击Add usser
点击保存按钮,出现信息用户信息

添加虚拟主机


点击新增的这个student
点击Virtual Hosts
点击

绑定虚拟主机与用户
点击
往下滚动

上方出现studnet
再次点击虚拟主机

到此用户和虚拟主机绑定了




右键生成

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.rabbit</groupId>
    <artifactId>schoolmanage</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>war</packaging>
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
        </dependency>
    </dependencies>
</project>
package schoolmanage;

import java.io.IOException;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class TestStudentMsgDirectSend {
    private Connection connection=null;
    private String queueName="simple";
    @Before
    public void init() throws IOException{
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.6.130");
        factory.setPort(5672);
        factory.setVirtualHost("/school");
        factory.setUsername("student");
        factory.setPassword("student");
        //创建连接
        connection=factory.newConnection();
    }
    //消息生产者
    @Test
    public void provider() throws IOException{
        //创建通道
        Channel channel=connection.createChannel();
        String msg="我要上学!!!——简单模式";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.basicPublish("", queueName, null, msg.getBytes());
        channel.close();
        System.out.println("消息发送成功!!!");
    }
    @After
    public void close() throws IOException{
        connection.close();
    }
}


消息发送成功!!!

加入消费者代码后

package schoolmanage;

import java.io.IOException;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class TestStudentMsgDirectSend {
    private Connection connection=null;
    private String queueName="simple";
    @Before
    public void init() throws IOException{
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //为工厂对象添加数据 
        //远程主机
        factory.setHost("192.168.6.130");
        //端口号
        factory.setPort(5672);
        //虚拟主机
        factory.setVirtualHost("/school");
        //用户名
        factory.setUsername("student");
        //密码
        factory.setPassword("student");
        //创建连接
        connection=factory.newConnection();
    }
    //消息生产者
    @Test
    public void provider() throws IOException{
        //创建通道
        Channel channel=connection.createChannel();
        String msg="我要上学!!!——简单模式";
        //创建队列
        /**
         * String queue, 队列的名称
         * boolean durable,持久化 false表示不持久化, true表示rabbitmq重启后会恢复队列的内容
         * boolean exclusive,服务器所独有,如果设置为true则消费者不能使用
         * boolean autoDelete,是否自动删除  当队列中没有消息时,该队列是否自动删除
         * Map<String, Object> arguments 额外的参数
         */
        channel.queueDeclare(queueName, false, false, false, null);
        /**
         * String exchange, 交换机的名称,如果需要交换机则添加名称 如果没有交换机 则为""串
         * String routingKey, 路由key 寻址的关键字,如果需要使用路由key定义特定的关键字(orderKey.xxx)
         * 如果不需要路由key,在简单模式中的添加队列的名称
         * BasicProperties props, 其他的配置,一般为null
         * byte[] body,表示需要发送的消息-字节码文件
         */
        channel.basicPublish("", queueName, null, msg.getBytes());
        //将流关闭
        channel.close();
        System.out.println("消息发送成功!!!");
    }
    //消费者
    @Test
    public void consumer() throws Exception{
        //获取通道
        Channel channel=connection.createChannel();
        //定义消息队列
        channel.queueDeclare(queueName,false,false,false,null);
        //创建消费者对象
        QueueingConsumer consumer=new QueueingConsumer(channel);
        //将消费者与队列进行绑定
        /**
         * String queue 队列名称,
         * boolean autoAck 是否自动回复 true自动回复 false手动回复, 
         * Consumer callback 一般消费者对象
         */
        channel.basicConsume(queueName,true,consumer);
        //消费者从队列中获取数据
        //通过迭代的方式遍历队列
        QueueingConsumer.Delivery nextDelivery = consumer.nextDelivery();
        String msg="消费者收到 :"+new String(nextDelivery.getBody());
        System.out.println(msg);
        channel.close();
    }
    @After
    public void close() throws IOException{
        connection.close();
    }
}



用student登录管理端可看到对列的名字
消费者收到 :我要上学!!!——简单模式
上一篇 下一篇

猜你喜欢

热点阅读