RabbitMQ-4 快速入门 消息的生产与消费

2020-04-01  本文已影响0人  巴巴11

关键点:
ConnectionFactory: 获取连接工厂
Connection : 一个连接
Channel : 数据通信信道,可发送和接收消息

Queue : 具体的消息存储队列
Producer & Consumer 生产和消费者

简单实践:

// rabbit client
<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
// producer
package com.example.demo.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建一个连接工厂,并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        // 2 创建连接
        Connection connection = connectionFactory.newConnection();
        // 3 通过连接创建一个channel
        Channel channel = connection.createChannel();
        // 4 通过channel发送消息
        for (int i = 0; i < 5; i++) {
            String msg = "hello, rabbit...";
            // var1 exchanger  var2 routingKey var3 message props var4 message body
            channel.basicPublish("", "test001",
                    null, msg.getBytes());
        }
        // 5 关闭相关连接
        channel.close();
        connection.close();
    }
}

// consumer
package com.example.demo.quickstart;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 */
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 创建一个连接工厂,并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        // 2 创建连接
        Connection connection = connectionFactory.newConnection();
        // 3 通过连接创建一个channel
        Channel channel = connection.createChannel();
        // 4 创建一个队列
        String queueName = "test001";
        channel.queueDeclare(queueName, true, false, false, null);
        // 5 创建消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 6 设置channel
        channel.basicConsume(queueName, true, consumer);
        // 7 接收消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
//            Envelope envelope = delivery.getEnvelope();
        }

    }
}

2 Exchange 交换机

image.png image.png image.png

2 Direct Exchange

image.png
image.png

3 Topic Exchange

image.png
image.png
image.png

4 Fanout Exchang

image.png
image.png

5 Binding -- 绑定

image.png

6 Queue - 消息队列

image.png

7 Message --消息

image.png
image.png

8 Virtual Host --虚拟主机

image.png
上一篇下一篇

猜你喜欢

热点阅读