1-5 交换机HelloWord

2020-07-09  本文已影响0人  Finlay_Li

交换机属性

  1. Name:交换机名称
  2. Type:交换机类型direct、topic、 fanout、 headers
  3. Durability:是否需要持久化,true为持久化
  4. Auto Delete:当最后一个绑定到Exchange.上的队列删除后,自动删除该Exchange
  5. Internal:当前Exchange是否用于RabbitMQ内部使用, 默认为False
  6. Arguments:扩展参数,用于扩展AMQP协议自制定化使用

Direct Exchange

简介

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue

注意: Direct模式可以使用RabbitMQ自带的Exchange: default Exchange,所以不需要将Exchange进行任何绑定(binding)操作。消息传 递时,RouteKey必须完全匹配才会被队列接收,消费者才能读取到消息,否则该消息会被抛弃

结构图

image.png

示例


package com.dodou.liwh.amqp.boot.direct;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectConfig {

    @Bean
    DirectExchange directExchange() {
        DirectExchange direct_ex = new DirectExchange("direct.ex");
        return direct_ex;
    }

    @Bean
    Queue directQueue() {
        Queue direct_que = new Queue("direct.que");
        return direct_que;
    }

    /*队列绑定交换机:Boot自动绑定
      1)每一个队列的名字不能相同
      2)并且转入的参数名要匹配*/
    @Bean
    Binding binding(DirectExchange directExchange,Queue directQueue) {
        Binding binding = BindingBuilder.bind(queue).to(directExchange).with("direct.rout.key");
        return binding;
    }
}

package com.dodou.liwh.amqp.boot.direct;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class DirectSender {

    @Autowired
    private AmqpTemplate amqpTemplate;
    private String RoutingKey = "direct.rout.key";
    private String EX_NAME = "direct.ex";
    private String msg = "交换机类型是:direct,用以支持RoutingKey";

    //发送消息到Exchange
    public void Send() {
        amqpTemplate.convertAndSend(EX_NAME, RoutingKey, msg);
        System.out.println(msg);
    }

}

package com.dodou.liwh.amqp.boot.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "direct.que")
public class DirectReceiver {

    @RabbitHandler
    public void consume(String msg) {
        System.out.println("Receiver消费:队列是:direct" + ":消息:" + msg);
    }
}

Topic Exchange

简介

所有发送到Topic Exchange的消息被转发到,模糊匹配RouteKey的Queue
如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。

匹配规则

示例


package com.dodou.liwh.amqp.boot.topic;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicConfig {

    @Bean
    public TopicExchange topicExchange() {
        // 参数1 name :交换机名称
        // 参数2 durable :是否持久化
        // 参数3 autoDelete :当所有消费客户端连接断开后,是否自动删除队列
        TopicExchange exchange = new TopicExchange("topic.ex", false, false);
        return exchange;
    }

    @Bean
    public Queue topicQueue() {
        // 参数1 name :队列名
        // 参数2 durable :是否持久化
        // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
        // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
        Queue queue = new Queue("topic.que",false,false,false);
        return queue;
    }

    @Bean
    public Binding binding(TopicExchange topicExchange, Queue topicQueue) {
        //绑定消费规则
        Binding binding = BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.#");
        return binding;
    }
}
package com.quanwugou.mall.mq;


import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.math.BigDecimal;

@Component
public class TopicSender {

    @Autowired
    private AmqpTemplate amqpTemplate;
    private String TOPIC_EX = "topic.ex";
    //生产者发送RoutingKey
    private String RoutingKey = "topic.hello.fast";
    private String msg = "交换机类型是:topic,模糊匹配RoutingKey";

    public void send() throws Exception {
        Hehe hehe = new Hehe();
        hehe.setI(1);
        Hei hei = new Hei();
        hei.setNum(BigDecimal.ONE);
        hehe.setO(hei);
        //SimpleMessageConverter only supports String, byte[] and Serializable payloads : 实体类有没有序列化? 默认的转换器是SimpleMessageConverter,它适用于String、Serializable实例和字节数组。
        //仅推荐JSONString、Serializable实例
        amqpTemplate.convertAndSend(TOPIC_EX, RoutingKey, hehe);
        System.out.println(msg);
    }
}
package com.quanwugou.mall.mq;


import cn.hutool.core.util.ObjectUtil;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class TopicReceiver {

    /*msg : 消费的消息
     *channel : 当前操作通道
     *@Header : 可以获取到所有的头部信息
     * */
    @RabbitListener(queues = "topic.que")
    @RabbitHandler
    public void rec(Message msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            channel.basicQos(1);
            Hehe hehe = (Hehe) ObjectUtil.deserialize(msg.getBody());
            System.out.println("receiver: " + hehe.toString());
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // TODO 消费失败,那么我们可以进行容错处理,比如转移当前消息进入其它队列
//                channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息
//                而basicReject一次只能拒绝一条消息

//                tag:消息标识
//                false:是否批量.true:将一次性拒绝所有小于deliveryTag的消息
//                true:重新排队
            channel.basicNack(tag, false, true);//重排并不是放在最后
        }
    }
}

Fanout Exchange

简介

  1. 不关心路由键,只需要将队列绑定到交换机上即可,因此转发消息是最快的
  2. 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上

结构图

image.png

示例

放在2-1 RabbitTemplate实现

上一篇下一篇

猜你喜欢

热点阅读