消息队列实践(三)—— Exchange和Route

2019-08-08  本文已影响0人  瑞瑞余之

RabbitMQ官网有一篇文章用来介绍AMQP模型。AMQP是什么呢,它是RabbitMQ所支持的队列协议,即为:Advanced Message Queue Protocol,基于这个协议producer和consumer才可以和broker进行交互。
在我们之前的文章里介绍了producer - queue - consumer这样的结构,而在实际的AMQP中还有一个实例——Exchange,我们不妨看一下官网给出来的架构图:


AMQP 模型

这个是RabbitMQ完全的架构模型,我们看到Publisher将消息发布到Exchange,而非Queue,完全由Exchange决定将message发布到哪一个Queue,分发的依据来自route rules。这样做有什么好处呢?

  1. Publisher单纯的进行发布工作,它不用担心具体推送到哪一个队列;
  2. 在实际项目中,根据接受的数据类型不同可能存在多个队列,比如一个媒体中心的新闻数据有体育新闻、时事新闻、民生新闻等等,这些新闻类型是并行而不交叉的,所以每个类型都存在自己的Queue。Publisher发布的新闻到Exchange中会根据route rule找到对应的队列,而不会污染其它的队列数据。

从上面的例子我们可以看出Exchange在接受Publisher的message后,会有不同类型的分发,这涉及到Exchange的不同类型:Direct Exchange、Fanout Exchange、Topic Exchange、Headers Exchange、Default Exchange五类

1. Direct Exchange

从名字可以看出,它是一种简单粗暴的分派方式,publisher在发布message的时候,会给message一个routing key,比如说字符串“key”,当message到Exchange后,会被转发给routing key也为“key”的队列(可能有多个)。

2. Fanout Exchange

这种方式就是消息广播,所有与该类型Exchange绑定的Queue都会收到message。有多少个queue,Exchange就会复制多少分,然后转发。比如我们在腾讯体育看NBA直播,上面的比分数据时时变化,那么腾讯体育就可以将比分message发布给Fanout Exchange,这样让所有终端都可以收到。

3. Topic Exchange

它实际上是对Direct Exchange的强化,在Direct X 中routing key是绝对相等式匹配,那么Topic X提供模糊匹配的方式。

4. Header Exchange

以上Exchange方式都是通过Routing Key进行匹配转发,Header Exchange放弃使用Routing Key,而是采用Header Properties的方式。在创建Queue时会定义一个map1,这个map1中除了普通的key-value,还会有一个熟悉key:x-match,value:any/all;Producer在创建message的时候也会产生一个map2,Header X会将message中的map2和Queue中的map1进行比对,如果map1定义的x-match:any则代表只要map1与map2中的key-value有一个匹配,该message就会转发到该Queue中;如果map1定义的x-match:all则代表map1与map2中的key-value必须全部匹配,该message才会转发到该Queue中。

5. Default Exchange

Default X其实就是Direct X的简易版,如果Queue在定义Routing Key的时候设为空字符串,则默认这个Queue的Routing Key就是它的名字。其它过程和Direct X一致。
下面我们用Topic Exchange和Header Exchange作为实战样例,来看看Exchange的使用。

//Rev.java
package com.otof.rabbitmq.receive;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Recv {
    private final String QUEUE_TOPIC_1 = "QUEUE_TOPIC1";
    private final String QUEUE_TOPIC_2 = "QUEUE_TOPIC2";
    private final String QUEUE_TOPIC_3 = "QUEUE_TOPIC3";
    private final String QUEUE_HEADER = "QUEUE_HEADER";
    private final String EXCHANGE_TOPIC = "TOPICX";

    public void receiveMessage() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //创建一个Topic Exchange
        channel.exchangeDeclare(EXCHANGE_TOPIC, "topic");
        //声明3个队列
        channel.queueDeclare(QUEUE_TOPIC_1, false, false, false, null);
        channel.queueDeclare(QUEUE_TOPIC_2, false, false, false, null);
        channel.queueDeclare(QUEUE_TOPIC_3, false, false, false, null);

        //将三个队列都与Topic X关联,同时定义各自的Routing Key

        //支持类似于topic.one  *代表一个单词
        channel.queueBind(QUEUE_TOPIC_1, EXCHANGE_TOPIC, "topic.*");
        //支持类似于topic.one或topic.one.two  #代表零个或多个以.分隔的单词
        channel.queueBind(QUEUE_TOPIC_2, EXCHANGE_TOPIC, "topic.#");
        //仅支持Routing Key=topic_not_pass的message
        channel.queueBind(QUEUE_TOPIC_3, EXCHANGE_TOPIC, "topic_not_pass");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        //定义成功入队后的回调函数
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("This is Queue 1 Received '" + message + "'");
        };
        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("This is Queue 2 Received '" + message + "'");
        };
        DeliverCallback deliverCallback3 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("This is Queue 3 Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_TOPIC_1, true, deliverCallback1, consumerTag -> {});
        channel.basicConsume(QUEUE_TOPIC_2, true, deliverCallback2, consumerTag -> {});
        channel.basicConsume(QUEUE_TOPIC_3, true, deliverCallback3, consumerTag -> {});
    }
}

//Send.java
package com.otof.rabbitmq.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Send {
    private final String EXCHANGE_TOPIC = "TOPICX";
    private final String QUEUE_TOPIC = "QUEUE_TOPIC";

    public void sendMessage(String message, String key) {
        ConnectionFactory factory = new ConnectionFactory();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //声明Exchange Topic,名字与Rev.java中一致
            channel.exchangeDeclare(EXCHANGE_TOPIC, "topic");

            //发布带Routing Key的message信息
            channel.basicPublish(EXCHANGE_TOPIC, key, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
//RabbitmqController.java
package com.otof.rabbitmq.controllers;

import com.otof.rabbitmq.receive.Recv;
import com.otof.rabbitmq.send.Send;
import org.springframework.web.bind.annotation.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@RestController
public class RabbitmqController {

    @PostMapping(path = "addToQueue")
    public void addMessageToQueue(@RequestParam String message, @RequestParam String key) {
        new Send().sendMessage(message, key);
    }

    @GetMapping(path = "getFromQueue")
    public void getMessageFromQueue() throws IOException, TimeoutException {
        new Recv().receiveMessage();
    }
}

用postman请求getFromQueue,进行注册Exchange和Queue,这个过程叫做Subscription;然后请求addToQueue,将message插入队列。我们来看一下结果:

  1. 请求:当Key=topic


    Key=topic

    结果:匹配topic.#,#代表零个或多个以.分隔的字符串


    key=topic结果
  2. 请求:Key=topic.one


    key=topic.one结果
  3. 请求:Key=topic.one.two.three


    Key=topic.one.two.three
  4. 请求:Key= topic_not_pass


    Key= topic_not_pass结果

以上就是Topic X的实践,把握以下几条关键点:

  1. 先注册后发布,Subscription-Public
  2. Public只针对Exchange,与Queue无关
  3. Exchange与Queue的绑定过程中需要定义Routing Key

//Rev.java
package com.otof.rabbitmq.receive;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class Recv {
    private final String QUEUE_HEADER_1 = "QUEUE_HEADER1";
    private final String QUEUE_HEADER_2 = "QUEUE_HEADER2";
    private final String EXCHANGE_HEADER = "HEADERX";

    public void receiveMessage() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //声明Header X
        channel.exchangeDeclare(EXCHANGE_HEADER, "headers");

        //任意满足key1=>aaa或key2=>bbb都通过
        Map<String, Object> map1 = new HashMap(){{
            put("key1", "aaa");
            put("key2", "bbb");
            put("x-match", "any");
        }};
        channel.queueDeclare(QUEUE_HEADER_1, false, false,false, null);
        channel.queueBind(QUEUE_HEADER_1, EXCHANGE_HEADER, "", map1);

        //全部满足key1=>aaa或key2=>bbb则通过
        Map<String, Object> map2 = new HashMap(){{
            put("key1", "aaa");
            put("key2", "ccc");
            put("x-match", "all");
        }};
        channel.queueDeclare(QUEUE_HEADER_2, false, false,false, null);
        channel.queueBind(QUEUE_HEADER_2, EXCHANGE_HEADER, "", map2);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("This is Queue 1 Received '" + message + "'");
        };
        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("This is Queue 2 Received '" + message + "'");
        };

        channel.basicConsume(QUEUE_HEADER_1, true, deliverCallback1, consumerTag -> {});
        channel.basicConsume(QUEUE_HEADER_2, true, deliverCallback2, consumerTag -> {});
    }
}
//Send.java
package com.otof.rabbitmq.send;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;


public class Send {
    private final String EXCHANGE_TOPIC = "TOPICX";
    private final String EXCHANGE_HEADER = "HEADERX";

    public void sendMessage(Map<String, Object> map, String message) {
        ConnectionFactory factory = new ConnectionFactory();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_HEADER, "headers");
            BasicProperties basicProperties = new BasicProperties();
            basicProperties = basicProperties.builder().headers(map).build();
            channel.basicPublish(EXCHANGE_HEADER, "", basicProperties, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
//RequestData.java
package com.otof.rabbitmq.entity;

import java.util.Map;

public class RequestData {
    String message;
    Map<String, Object> map;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Map<String, Object> getMap() {
        return map;
    }

    public void setMap(Map<String, Object> map) {
        this.map = map;
    }
}
//RabbitmqController.java
package com.otof.rabbitmq.controllers;

import com.otof.rabbitmq.entity.RequestData;
import com.otof.rabbitmq.receive.Recv;
import com.otof.rabbitmq.send.Send;
import org.springframework.web.bind.annotation.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

@RestController
public class RabbitmqController {

    @PostMapping(path = "addToQueue")
    public void addMessageToQueue(@RequestBody RequestData requestData) {
        new Send().sendMessage(requestData.getMap(), requestData.getMessage());
    }

    @GetMapping(path = "getFromQueue")
    public void getMessageFromQueue() throws IOException, TimeoutException {
        new Recv().receiveMessage();
    }
}

我们看一下postman发送message和Key Routing


Key Routing

结果:


结果
上一篇下一篇

猜你喜欢

热点阅读