RabbitMQ 消息队列

2020-05-12  本文已影响0人  潜心之力

一、Windows 安装

二、整合SpringBoot

<dependency> -> pom.xml
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

spring: -> application.yml
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

三、交换机模式

1、Direct模式是RabbitMQ默认的交换机模式,创建消息队列的时候绑定一个BindingKey,可以理解为消息队列的名称,生产者发送消息的时候绑定BindingKey,可以理解为将消息发送到指定的队列中,消费者监听队列的时候绑定BindingKey,可以理解为读取指定队列中的消息。直连模式是典型的一对一,当生产者发送一个消息到队列中,只要有任意一个消费者将消息从队列中取出,消息立即被消耗,其他的消费者不会获取到该消息。

@Component
public class RabbitSender { -> 消息生产者,用于发送消息到队列
    @Autowired
    private AmqpTemplate mAmqpTemplate;

    public void message(String msg){
        mAmqpTemplate.convertAndSend("DirectQueue",msg);
    }
}

@Component
@RabbitListener(queues = "DirectQueue")
public class RabbitReceiver { -> 消息消费者,用于接收处理队列中的消息
    @RabbitHandler
    public void message(String msg){
        System.out.println("收到消息:"+msg);
    }
}

@Configuration
public class RabbitConfig { -> 创建消息队列
    @Bean
    public Queue DirectQueue(){
        return new Queue("DirectQueue");
    }
}

2、Topic模式支持按指定规则匹配转发,路由键是一段带有“.”分隔的字符串。#代表匹配一个或多个任意字符,*代表匹配一个任意字符。主题模式是典型的一对多,生产者可以通过指定的规则模糊匹配相关的队列进行消息投递,灵活性高。

@Component
public class RabbitSender {

    @Autowired
    private AmqpTemplate mAmqpTemplate;

    public void message1(String msg){
        mAmqpTemplate.convertAndSend("exchange","topic.queue1",msg);
    }

    public void message2(String msg){
        mAmqpTemplate.convertAndSend("exchange","topic.queue2",msg);
    }
}

@Component
public class RabbitReceiver {

    @RabbitHandler
    @RabbitListener(queues = "topic.queue1")
    public void message1(String msg) {
        System.out.println("message1:" + msg);
    }

    @RabbitHandler
    @RabbitListener(queues = "topic.queue2")
    public void message2(String msg) {
        System.out.println("messages2:" + msg);
    }
}

@Configuration
public class RabbitConfig {

    @Bean
    public Queue topicQueue1() {
        return new Queue("topic.queue1");
    }

    @Bean
    public Queue topicQueue2() {
        return new Queue("topic.queue2");
    }

    @Bean
    TopicExchange exchange() { -> 创建消息交换机
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingTopicQueue1(Queue topicQueue1, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.queue*");
    }

    @Bean
    Binding bindingTopicQueue2(Queue topicQueue2, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

3、Fanout模式是广播,所有绑定到该交换机下的队列都能接收到消息,也就意味着在该模式下BindingKey是会被忽略的。

@Component
public class RabbitSender {

    @Autowired
    private AmqpTemplate mAmqpTemplate;

    public void message3(String msg){
        mAmqpTemplate.convertAndSend("fanoutExchange","",msg); -> 参数2被忽略
    }
}

@Component
public class RabbitReceiver {

    @RabbitHandler
    @RabbitListener(queues = "fanout.queue1")
    public void message1(String msg) {
        System.out.println("message1:" + msg);
    }

    @RabbitHandler
    @RabbitListener(queues = "fanout.queue2")
    public void message2(String msg) {
        System.out.println("messages2:" + msg);
    }

    @RabbitHandler
    @RabbitListener(queues = "fanout.queue3")
    public void message3(String msg) {
        System.out.println("messages3:" + msg);
    }
}

@Configuration
public class RabbitConfig {

    @Bean(name="FanoutQueue1")
    public Queue AMessage() {
        return new Queue("fanout.queue1");
    }

    @Bean(name="FanoutQueue2")
    public Queue BMessage() {
        return new Queue("fanout.queue2");
    }

    @Bean(name="FanoutQueue3")
    public Queue CMessage() {
        return new Queue("fanout.queue3");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingFanoutQueue1(@Qualifier("FanoutQueue1") Queue queue,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    Binding bindingFanoutQueue2(@Qualifier("FanoutQueue2") Queue queue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    Binding bindingFanoutQueue3(@Qualifier("FanoutQueue3") Queue queue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

四、Windows 集群

简介:集群(cluster)就是一组计算机,它们作为一个整体向用户提供一组网络资源,这些单个的计算机系统就是集群的节点(node)。集群的特点如下:1、可扩展性,集群的性能不仅限于单一的服务实体,新的服务实体也可以动态地加入到集群。2、高可用性,当一台节点服务器发生故障的时候,这台服务器上所运行的应用程序将被另一节点的服务器自动接管。3、负载均衡,把任务均匀地分布到集群环境下的网络资源,以便提高数据吞吐量。

上一篇下一篇

猜你喜欢

热点阅读