MQrabbit mq

MQ之RabbitMQ

2017-06-08  本文已影响145人  inke

简介

MQ为Message Queue,消息队列是应用程和应用程序之间的通信方法。
RabbitMQ是一个开源的,在AMQP基础上完整的,可复用的企业消息系统。
支持主流的操作系统,Linux、Windows、MacOX等。
多种开发语言支持,Java、Python、Ruby、.NET、PHP、C/C++、node.js等

AMQP

AMQP是消息队列的一个协议。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

RabbitMQ官网地址http://www.rabbitmq.com/

Linux安装RabbitMQ

安装Erlang

添加yum支持

在线安装:

cd /usr/local/src/
mkdir rabbitmq
cd rabbitmq

wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm

rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc

sudo yum install erlang

离线安装:

centos相关资源:
http://omj2w5bt7.bkt.clouddn.com/rabbitmq-server-3.4.1-1.noarch.rpm
http://omj2w5bt7.bkt.clouddn.com/esl-erlang-compat-R14B-1.el6.noarch.rpm
http://omj2w5bt7.bkt.clouddn.com/esl-erlang_17.3-1centos6_amd64.rpm
http://omj2w5bt7.bkt.clouddn.com/esl-erlang-17.3-1.x86_64.rpm

windows相关资源:
http://omj2w5bt7.bkt.clouddn.com/rabbitmq-server-3.4.1.exe
http://omj2w5bt7.bkt.clouddn.com/otp_win32_17.4.exe
http://omj2w5bt7.bkt.clouddn.com/otp_win64_17.3.exe


上传esl-erlang_17.3-1~centos~6_amd64.rpm
执行 yum install esl-erlang_17.3-1~centos~6_amd64.rpm

上传:esl-erlang-compat-R14B-1.el6.noarch.rpm
yum install esl-erlang-compat-R14B-1.el6.noarch.rpm

安装RabbitMQ

上传rabbitmq-server-3.4.1-1.noarch.rpm文件到/usr/local/src/rabbitmq/
安装:
rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

启动、停止

service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart

设置开机启动

chkconfig rabbitmq-server on

设置配置文件

cd /etc/rabbitmq
cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/
mv rabbitmq.config.example rabbitmq.config

开启用户远程访问

vi /etc/rabbitmq/rabbitmq.config

开启web界面管理工具

rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart

防火墙开放15672端口

/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT
/etc/rc.d/init.d/iptables save


MAC安装RabbitMQ

安装:brew install rabbitmq

配置环境变量:

RabbitMQ Config

需要在 bash 或者 zsh 的环境变量中添加:

export PATH=$PATH:/usr/local/sbin

不然下面的指令,就必须到/usr/local/sbin目录下,才能直接执行。
启动:rabbitmq-server
停止:rabbitmqctl stop
状态:rabbitmqctl status

访问:http://localhost:15672/
默认的用户名和密码是 guestguest


基本使用

访问界面管理:


使用 guest/guest 进行登录。

添加用户

界面介绍:


创建虚拟主机

用户授权虚拟主机




通过交换机绑定队列或者通过队列绑定交换机都可以。



队列

简单队列


P:消息的生产者
C:消息的消费者
红色:队列

生产者将消息发送到队列,消费者从队列中获取消息。

代码测试:

导入下面测试代码,运行完 Send.java 消息队列中就有"Hello World! -- 1"字符串,在运行Recv.java,就会取出字符串。

启动接收者Recv.java,就可以在 rabbitMQ 管理平台看到如下的一些信息:



启动接收者Send.java,就可以在 rabbitMQ 看到队列中的信息:


Recv.java

public class Recv {

    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }
}

Send.java

public class Send {

    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();

        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 消息内容
        String message = "Hello World! -- 1";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

ConnectionUtil.java

public class ConnectionUtil {

    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("192.168.18.130");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/inke-test");
        factory.setUsername("inke");
        factory.setPassword("inke");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }

}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbitmq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rabbitmq-demo</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.4.0.RELEASE</version>
        </dependency>
    </dependencies>
</project>

Work模式

一个生产者、2个消费者。
一个消息只能被一个消费者获取。

Send.java

public class Send {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}

Recv.java

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        // channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 返回确认状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

Recv2.java

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        // channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成状态
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            // 休眠1秒
            Thread.sleep(1000);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

测试结果:

1、消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
2、消费者1和消费者2获取到的消息的数量是相同的,一个是奇数一个是偶数。
其实,这样是不合理的,应该是消费者1要比消费者2获取到的消息多才对。

Work模式的“能者多劳”

消费者需要打开这个开关


测试结果:

消费者1比消费者2获取的消息更多。


消息的确认模式

消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

手动模式:

手动模式:

解读:
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的

代码演示:

需求是:一个后台服务提供给前台和搜索展示数据,如何保证数据的统一性。

向交换机中发送消息。


注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。

消费者1(看作是前台系统)


消费者2(看作是搜索系统)


测试结果:

同一个消息被多个消费者获取。

在管理工具中查看队列和交换机的绑定关系:



使用订阅模式能否实现商品数据的同步?

答案:可以的。

后台系统就是消息的生产者。
前台系统和搜索系统是消息的消费者。
后台系统将消息发送到交换机中,前台系统和搜索系统都创建自己的队列,然后将队列绑定到交换机,即可实现。

消息,新增商品、修改商品、删除商品。

前台系统:修改商品、删除商品。
搜索系统:新增商品、修改商品、删除商品。

所以使用订阅模式实现商品数据的同步并不合理。


路由模式


如图所示:c1 队列绑定了 error ,c2 队列绑定了 info、error、warning。
如果发送 error 的消息,那么 c1 和 c2 都可以收到,发送 info、warning 只有c2可以收到,可以随意组合,更加自由。
例如:前台系统只需要收取“更新“”删除”的消息,而搜索系统还需要收取“新增”的消息。





测试结果:
当发送的key是 insert 的时候,只有Recv2队列的test_queue_direct_2才能接收到消息,
当发送的key是 delete、update的时候,Recv1和Recv2都能接收到消息。


通配符模式

将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。*
因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
topic交换机是如何工作的:


生产者(后台系统)Send.java

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息内容
        String message = "商品删除,id=1003";
        channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
        System.out.println(" 后台系统: '" + message + "'");

        channel.close();
        connection.close();
    }
}

消费者1(前台系统)Recv.java

public class Recv {

    private final static String QUEUE_NAME = "test_queue_topic_1";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 前台系统: '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

消费者2(搜索系统)Recv2.java

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_topic_2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" 搜索系统: '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

测试结果:item.# 匹配所有前缀是 item 的消息。


Spring-Rabbit

Spring项目
http://spring.io/projects

The project consists of two parts; spring-amqp is the base abstraction, and spring-rabbit is the RabbitMQ implementation.
该项目由两部分组成: spring-amqp是基础抽象,spring-rabbit是RabbitMQ实现。

代码演示:

public class SpringMain {
    
    public static void main(final String... args) throws Exception {
        AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
                "classpath:spring/rabbitmq-context.xml");
        //RabbitMQ模板
        RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
        //发送消息
        template.convertAndSend("Hello, world!");
        Thread.sleep(1000);// 休眠1秒
        ctx.destroy(); //容器销毁
    }
}
/**
 * 消费者
 */
public class Foo {

    //具体执行业务的方法
    public void listen(String foo) {
        System.out.println("消费者: " + foo);
    }
}

rabbitmq-context.xml

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

    <!-- 定义RabbitMQ的连接工厂 -->
    <rabbit:connection-factory id="connectionFactory"
        host="192.168.18.130" port="5672" username="inke" password="inke" virtual-host="/inke-test" />

    <!-- 定义Rabbit模板,指定连接工厂以及定义exchange -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
    <!-- <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
        exchange="fanoutExchange" routing-key="foo.bar" /> -->

    <!-- MQ的管理,包括队列、交换器等 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!-- 定义队列,自动声明 -->
    <rabbit:queue name="myQueue" auto-declare="true" durable="false"/>

    <!-- 定义交换器,自动声明 -->
    <rabbit:fanout-exchange name="fanoutExchange" auto-declare="true" durable="false">
        <rabbit:bindings>
            <rabbit:binding queue="myQueue"/>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

<!--    <rabbit:topic-exchange name="myExchange">
        <rabbit:bindings>
            <rabbit:binding queue="myQueue" pattern="foo.*" />
        </rabbit:bindings>
    </rabbit:topic-exchange> -->

    <!-- 队列监听 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="foo" method="listen" queue-names="myQueue" />
    </rabbit:listener-container>

    <bean id="foo" class="com.example.rabbitmq.spring.Foo" />

</beans>


<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="fanoutExchange" />
这里可以直接选择发送到哪个队列,而不是交换机。
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" queue="myQueue"/>


持久化交换机和队列

持久化:将交换机或队列的数据保存到磁盘,服务器宕机或重启之后依然存在。
非持久化:将交换机或队列的数据保存到内存,服务器宕机或重启之后将不存在。

非持久化的性能高于持久化。
如何选择持久化?非持久化?-- 看需求。



参考自:
某培训机构
CSDN
spring 官网

上一篇下一篇

猜你喜欢

热点阅读