RabbitMQ 极速入门
2019-09-26 本文已影响0人
qyfl
角色:
-
ConnectionFactory
连接工厂 -
Connection
一个连接 -
Channel
数据通信通道,可用于发送和接收消息 -
Queue
具体的消息存储队列 -
Producer
&Consumer
生产者和消费者
依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
消费者
import com.rabbitmq.client.*;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1. 创建一个 ConnectionFactory,并配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
// 3. 通过 connection 创建 Channel
Channel channel = connection.createChannel();
// 4. 声明一个队列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
// 5. 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 6. 设置 Channel
channel.basicConsume(queueName, true, queueingConsumer);
// 7. 获取消息
while (true) {
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消费端:" + msg);
}
}
}
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建一个 ConnectionFactory,并配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通过连接工厂创建连接
Connection connection = connectionFactory.newConnection();
// 3. 通过 connection 创建 Channel
Channel channel = connection.createChannel();
// 4. 通过 Channel 发送数据
String msg = "Hello RabbitMQ";
for (int i = 0; i < 5; i++) {
channel.basicPublish("", "test001", null, msg.getBytes());
}
// 5. 关闭相关连接
channel.close();
connection.close();
}
}
注意事项
- 如果生产者没有指定 exechange 或者 exechange 为空,默认使用
(AMQP default)
-
(AMQP default)
的规则是向与 routing key 名相同的 queue 发送消息。 - 如果 exechange 与 routing key 都为空,那么消息就就会被
(AMQP default)
删除。