(翻译)RabbitMQ官网教程(1)HelloWorld

2020-08-06  本文已影响0人  zhangyaqi

原文链接:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

简介


RabbitMQ是一个消息中间件:负责接收消息以及转发消息。可以将RabbitMQ想象为一个邮局:当你将想要寄的信投入邮箱时,你确信邮递员最终会将信派发给你的收件人。大同小异,RabbitMQ就像是邮箱、邮局、邮递员。

RabbitMQ与邮局最大的不同就是,RabbitMQ不处理纸张,而是接收、存储、转发二进制消息数据。

RabbitMQ及消息传递通用的一些术语:


注意:Producer、Consumer、Broker可以部署在不同的主机上;同一个应用程序既可以是生产者也可以是消费者。

HelloWorld(Java版)


这里我们会写两个Java程序:一个用于发送消息的Producer,一个用于接收并打印消息的Consumer。我们不会关注JavaAPI的细节,而是专注于"HelloWorld"这个简单程序。

下图中,"P"表示Producer,"C"表示Consumer,中间的方格表示Queue。

Java客户端依赖包

RabbitMQ支持多种协议。我们这里采用AMQP协议(一个用于消息传递的开放通用协议)。RabbitMQ提供了多种语言的客户端,这里我们使用RabbitMQ提供的Java客户端。

下载客户端依赖包, 然后将依赖包添加到你的程序文件下。该客户端会依赖(SLF4J APISLF4J Simple).

需要注意的是,SLF4J Simple对于本教程来说已经足够了,但是您应该在生产环境中使用像Logback这样成熟的日志库。

RabbitMQ的Java客户端依赖包也收录在Maven的中央仓库中,也可以用下边的方式引入该依赖包。

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>5.9.0</version>
</dependency>

发送消息


我们将消息的Producer的类命名为Send,消息的Consumer的类命名为Recv。Producer会连接到RabbitMQ并发送一条消息,然后退出。

在Send.java中,需要引入的下面的依赖类:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

设置Java类并声明队列名:

public class Send {
  private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception {
      ...
  }
}

创建与RabbitMQ的连接:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

}

这里的Connection是一个抽象的Socket Connection,主要解决协议版本处理、授权等问题。我们连接到的是本地RabbitMQ节点。如果想连接其他的节点,只需要指定主机名或IP地址即可。

然后我们创建了一个channel,我们用到的大部分API都在channel中。由于Connection和Channel都实现了Closeable接口,所以我们可以使用try-catch-with的写法,这样就不用再显式关闭资源。

发送消息前,我们需要先声明一个消息队列;然后再将消息发布到消息队列中。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

发送和接收消息时定义的消息队列名必须保持一致。如果队列不存在,则创建新的队列。消息体是一个byte数组,所以你可以编码任何类型的内容。

Send.java完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Receiving


Consumer监听来自RabbitMQ的消息。与上边仅仅发送单条消息的Producer不同,Consumer将一直运行以此来监听并打印消息。

Recv与Send的代码相似:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

我们使用一个特殊的接口DeliverCallback来缓存服务端推送过来的消息。

设置方法与Producer类似:打开一个Connection、Channel,然后声明将要消费的queue。需要注意的是这里的queue需要与Producer中声明的queue保持一致。

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}

注意我们这里也声明了queue。那是因为我们可能会在启动Producer之前就先启动Consumer,这样我们可以确保从queue中消费消息时该queue已经存在。

为什么这里我们没有用try-catch-with的方式来自动关闭channel和connection呢?如果采用这种方式的话就可以让程序简单的运行、关闭资源然后退出。但真这样的做的话就会很尴尬,因为我们希望的是Consumer异步监听消息到来时进程一直保持活跃。

我们将告诉服务端把Queu中的消息传给我们。由于服务端会异步将消息推送给我们,所以我们需要提供一个对象形式的回调,该对象会将消息缓存起来直到我们使用消息。这就是DeliverCallback的作用。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

Recv.java完整代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Receiver {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });

    }
}

将Send和Recv一块运行

在类路径下引用RabbiMQ的客户端依赖包编译这两个Java文件:

javac -cp amqp-client-5.7.1.jar Send.java Recv.java

运行程序的时候需要应用类路径下的rabbitmq-client.jar 、slf4j-api-1.7.26.jar、slf4j-simple-1.7.26.jar ,在终端运行Consumer:

java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Recv

之后再运行Producer:

java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Send

在WIndows系统中运行的话,需要将冒号『:』替换为分号『;』

当程序全部运行的时候,Consumer会把从RabbitMQ中获取到的Producer的消息打印出来。Consumer将会一直等待消息。

Queue清单

你或许希望看到RabbitMQ都有哪些queue以及queue中有多少消息。你可以 使用rabbitmqctl来查看这些信息:

 sudo rabbitmqctl list_queues

在Windows中对应的命令:

rabbitmqctl.bat list_queues

在第二章中,我们将创建一个简单的工作队列。

提示
为了方便输入,你可以给用到的依赖包设置一个环境变量,比如:

 export CP=.:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar
 java -cp $CP Send

Windows系统的话:

set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
java -cp %CP% Send
上一篇下一篇

猜你喜欢

热点阅读