RabbitMQ笔记-4

2019-10-14  本文已影响0人  后来猫走丢了

layout: post
title: RabbitMQ笔记-4
categories: RabbitMQ
tags:


这里记录官方的第3个Demo,这个Demo主要讲了交换器在MQ中的使用

[TOC]

To-do List


0X00 概要,以及记录完成情况


0X01 交换器的类型

默认交换器,或者叫不使用交换器

//消息通过routingKey指定的名称路由到队列(如果存在,是直接路由到队列)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

fanout类型的交换器

fanout示意图

生产者代码编写
package demo03;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 官方的第三个Demo 生产者
 * 主要介绍了以下功能:
 * 1.Fanout交换器的使用
 */
public class FanoutSend {

    /**
     * 定义交换器名称
     */
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {

        //配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("admin");
        factory.setPassword("123456");

        //try-with-resources
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel();
        ) {

            //声明交换器类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            //定义消息
            String msg = "你好";

            //通过通道把消息发送给交换器
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));

            System.out.println(" [X] Sent '" + msg + "'");

        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        }
    }

}


消费者代码编写
package demo03;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 对应官方Demo03的客户端
 */
public class FanoutRecv {

    /**
     * 交换器名称
     */
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        //连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("admin");
        factory.setPassword("123456");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //通道声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //通道声明一个不为持久的队,独占的(仅限此连接),自动删除的(服务器不在使用或者下线将此队列删除)队列
        String queueName = channel.queueDeclare().getQueue();

        //将队列绑定到交换器
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] 正在等待消息,退出请按 CTRL+C");

        //消息确认,关闭自动确认

//        DeliverCallback xx = new DeliverCallback() {
//            @Override
//            public void handle(String s, Delivery delivery) throws IOException {
//                String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
//                System.out.println(" [X] 收到消息'" + msg + "',我的工作编号为 FanoutRecv");
//                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//            }
//        };
//        channel.basicConsume(queueName, false, xx, consumerTag -> {});

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [X] 收到消息'" + msg + ",我的工作编号为 FanoutRecv'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
    }

}


代码中的一些API介绍(基于5.6.0 AP)

生产者的代码

channel.exchangeDeclare()

channel.exchangeDeclare(java.lang.String exchange, java.lang.String type)

主动声明一个非自动删除,非持久的交换,没有额外的参数

参数名 参数类型 参数意义
exchange String 交换器的名称(比如上面生产者代码中的"logs")
type String 定义交换器的类型(代码中的"fanout"类型交换器)

channel.basicPublish()

channel.basicPublishjava.lang.String exchange, java.lang.String routingKey, AMQP.BasicProperties props, byte[] body)

参数名 参数类型 参数意义
exchange String 消息需要发送到的交换器名称(比如上面生产者代码中的"logs")
routingKek String 路由器密钥
props AMQP.BasicProperties 消息的其他属性 - 路由头等
body byte[] 消息正文

消费者的代码

channel.basicAck()

basicAck(long deliveryTag, boolean multiple)

参数名 参数类型 参数意义
deliveryTag long RabbitMQ推送消息给消费者时,会附带一个DeliveryTag,以便消费者可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增
multiple boolean 取值为 false 时,表示通知 RabbitMQ 当前消息被确认;如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认。(批量确认针对的是整个信道,参考gordon.study.rabbitmq.ack.TestBatchAckInOneChannel.java。) 对同一消息的重复确认,或者对不存在的消息的确认,会产生 IO 异常,导致信道关闭。

channel.basicConsume()

basicConsume(java.lang.String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)

参数名 参数类型 参数意义
queue String 队列名称
autoAck boolean 是否自动确认消息(手动确认和自动确认,在开发中一般为手动确认,即值为false,详细说明参考Consumer Acknowledgements and Publisher Confirms)
deliverCallback deliverCallback 传递消息时的回调
cancelCallback CancelCallback 取消消费者时的回调(这个是个功能性接口可以用作lambda表达式或方法引用的赋值目标)

direct类型的交换器

direct交换器原理图
相同路由key绑定多个队列
生产者代码编写
package demo04;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 官方的第4个Demo
 * 主要实现了以下内容:
 * 1.direct类型交换器的使用
 */
public class DirectSend {

    /**
     * 定义交换器的名称
     */
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {

        //配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("admin");
        factory.setPassword("123456");

        //try-with-resources
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()
        ) {

            //声明交换器类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            //定义Info消息(消费者只有在routingKek相同时才能接收到消息)
            String infoMsg = "你好 direct";

            //定义Error消息
            String errorMsg = "错误 direct";

            //消息发送
            channel.basicPublish(EXCHANGE_NAME, "info", null, infoMsg.getBytes(StandardCharsets.UTF_8));

            System.out.println(" [X] Sent '" + infoMsg + "'");

            //错误消息发送
            channel.basicPublish(EXCHANGE_NAME, "error", null, errorMsg.getBytes(StandardCharsets.UTF_8));

            System.out.println(" [X] Sent '" + errorMsg + "'");

        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        }

    }

}

消费者代码编写
负责接收Info路由key
package demo04;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 对应官方Demo04的客户端
 * 负责接收Info
 */
public class DirectRecvForInfo {

    /**
     * 交换器名称
     */
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        //配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("admin");
        factory.setPassword("123456");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //通过通道声明一个不为持久的队,独占的(仅限此连接),自动删除的(服务器不在使用或者下线将此队列删除)队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定到交换器
        channel.queueBind(queueName, EXCHANGE_NAME, "info");

        System.out.println(" [*] 正在等待消息,退出请按 CTRL+C");

        DeliverCallback deliverCallback = (consumerTar, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [X] 收到消息'" + msg + ",我的工作编号为 DirectRecv,负责接收 Info 消息'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
    }
}


负责接收Error路由key
package demo04;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * 对应官方Demo04的客户端
 * 负责接error
 */
public class DirectRecvForError {

    /**
     * 交换器名称
     */
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        //配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("admin");
        factory.setPassword("123456");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //通过通道声明一个不为持久的队,独占的(仅限此连接),自动删除的(服务器不在使用或者下线将此队列删除)队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定到交换器
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println(" [*] 正在等待消息,退出请按 CTRL+C");

        DeliverCallback deliverCallback = (consumerTar, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [X] 收到消息'" + msg + ",我的工作编号为 DirectRecv,负责接收 Error 消息'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
    }
}

代码效果展示
Direct类型交换器生产者发送消息 Direct类型交换器Info消费者接受消息 Direct类型交换器消费者接受Error消息
代码中的一些API介绍(基于5.6.0 AP)
消费者的代码

channel.queueBind()

channel.queueBind(java.lang.String queue, java.lang.String exchange, java.lang.String routingKey)

主动声明一个非自动删除,非持久的交换,没有额外的参数

参数名 参数类型 参数意义
queue String 通道名称
exchange String 交换器名称
routingKey String 路由Key

topic类型的交换器

topic原理图

表达式 匹配意义
* (星号)可以替代一个单词(或者标识符)
# (hash)可以替换零个或多个单词(或者标识符)

生产者代码编写
package demo05;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;


public class TopicSend {

    //定义交换器名称
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) {

        //配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("admin");
        factory.setPassword("123456");

        //tyr-witch-resources
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel();)
        {

            //声明交换器类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            //定义消息
            String msg = "# 匹配";

            //消息发送
            channel.basicPublish(EXCHANGE_NAME, "anonymous.info", null, msg.getBytes(StandardCharsets.UTF_8));

            System.out.println(" [X] Sent '" + msg + "'");

        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        }
    }

}

消费者代码编写
package demo05;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class TopicRecvForJinHao {

    /**
     * 交换器名称
     */
    public static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {

        //配置连接信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("admin");
        factory.setPassword("123456");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明交换器
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        //通过通道声明一个不为持久的队,独占的(仅限此连接),自动删除的(服务器不在使用或者下线将此队列删除)队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定到交换器,配置匹配key为 # 接收所有类型消息
        channel.queueBind(queueName, EXCHANGE_NAME, "#");

        System.out.println(" [*] 正在等待消息,退出请按 CTRL+C");

        DeliverCallback deliverCallback = (consumerTar, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [X] 收到消息'" + msg + ",我的路由Key为 # ,负责接收所有消息'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        channel.basicConsume(queueName, false, deliverCallback, consumerTag -> {});
    }

}

代码效果展示
其它说明
上一篇 下一篇

猜你喜欢

热点阅读