RabbitMQ官方教程1--hello world

2020-02-23  本文已影响0人  亼珏

介绍

RabbitMQ是一个消息代理:它接收并转发信息。

术语

生产者Producter
      发送消息的程序被称为生产者。

队列Queue
      RabbitMQ内部类似于邮箱的部分称为队列。虽然消息流经RabbitMQ和你的应用程序,但是它们只能被存储于队列中。队列仅受主机的存储器和磁盘大小的限制,它本质上是一个大的消息缓冲器。许多生产者可以向同一个队列发送消息,许多消费者也可以尝试从同一个队列接收消息。

消费者Consumer
      接收消息的程序被称为消费者。

\color{red}{【注意】}
  producter,consumer和broker可以不再同一台主机上;
  一个应用程序也可以同时是生产者和消费者。

Hello World教程(Java)

      这一部分的教程会写两个Java程序:一个生产者,用来发送一条信息;一个消费者,用来接收信息并将其打印出来。这条信息就是“Hello World”。

Java Client 库
 
RabbitMQ有多种协议。本教程中使用的是 AMQP 0-9-1,它是一种开放的通用信息传递协议。RabbitMQ有许多不同语言的客户端,我们将使用其提供的Java客户端。
 
下载客户端库及其依赖(SLF4J API and SLF4J Simple)。将这些jar包及Java教程文件复制到你的工作目录中。
 
注意:对于本教程而言SLF4J Simple已经足够了,但是在生产过程中应该使用成熟的日志记录库,例如Logback
 
RabbitMQ Java客户端也存在于Maven的中央仓库中:
其groupId为com.rabbitmq,artifactId为amqp-client

现在有了Java client及其依赖项,可以开始编写代码了。其中消息发送者(即生产者)称为Send;消息接收者(即消费者)称为Revc

发送

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Send.java

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Send {
  // 设置队列名
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception {
    // 创建到服务器的连接 
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
              channel.queueDeclare(QUEUE_NAME, false, false, false, null);
              String message = "Hello World!";
              channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
              System.out.println(" [x] Sent '" + message + "'");
     }
  }
} 

1.创建到服务器的连接。
      connection抽象了socket connection,负责协议版本协商和认证等工作。这里我们连接到本地计算机上的代理--localhost。如果想连接其他计算机上的代理只需指定其host或者IP地址。

2.创建一个通道。
      channel是用于完成任务的大多数API所在的位置。注意,此处使用的是try-with-resources语句。因为ConnectionChannel都实现了java.io.Closeable接口。这样我们就不用再代码中显示的关闭资源了。

3.在try-with-resources语句中将消息发送至队列。
      声明队列是幂等的,只有当队列不存在时才会被创建。消息内容是一个字节数组,所以可以在里面编码任何内容。

发送未成功
 
如果这是你第一次使用RabbitMQ并且没有看见已发送的消息,那么您可能会挠头想知道哪里可能出错。
 
可能是代理在没有足够的磁盘空间下启动了(默认情况下,至少需要200MB的可用空间),因此拒绝接收消息。可以通过检查代理的日志文件来确认,并且如果有必要的话可以减少限制。配置文件文档会教你如何设置disk_free_limit

接收

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java

与生产者不同,生产者只是发送单独的一条消息,消费者需要一直监听RabbitMQ的消息,所以需要保证它一直运行监听消息并打印出来。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {
    // 设置与生产者相同的队列名
    private final static String QUEUE_NAME = "hello";
    
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

1.打开一个连接和一个通道,声明消费消息的队列(与生产者设置的队列名相同)

\color{red}{【注意】}
  在此处声明队列是因为可能消费者在生产者之前启动,
  在尝试消费队列中的消息之前需要确保这个队列一定是存在的。

2.此处不使用try-with-resource语句自动关闭通道和链接:
      因为我们希望在消费者异步侦听消息到达时,进程保持活动状态。

3.需要告诉服务器将队列中的消息传递给我们
      由于它将异步的向我们推送消息,所以以对象的形式提供了一个回调,该回调会为我们缓冲消息,直到准备使用为止。上述就是DeliverCallback子类所做的。

发送&接收

1.编译--可以仅使用classpath上的RabbitMQ Java Client来编译这两个类:
javac -cp amqp-client-5.7.1.jar Send.java Recv.java

2.运行--在classpath中需要rabbitmq-client.jar及其依赖。在终端中运行他们:
java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Recv
java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Send

https://www.rabbitmq.com/getstarted.html

上一篇下一篇

猜你喜欢

热点阅读