RabbitMQ 在Spring Boot2.*中的使用

2019-06-19  本文已影响0人  CongCo
rabbit

介绍

RabbitMQ is the most widely deployed open source message broker
官网地址链接
RabbitMQ 是开源消息代理软件(有时称为面向消息的中间件),用于实现高级消息队列协议 (AMQP)。RabaMQ 服务器以 Erlang 编程语言编写,并基于用于群集和故障转移的开放电信平台框架。与代理接口的客户端库可用于所有主要编程语言。

安装方式

使用docker 安装

rabbitMQ
docker地址
docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 rabbitmq:3-management

然后,您可以在浏览器中http://localhost:8080转到http://localhost:8080http://host-ip:8080http://host-ip:8080

localhost:8080

spring boot2

在pom中引入jar

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件application.yml

server:
  port: 8081 # 8080已被占用
spring:
  application:
    name: bana-mq
  rabbitmq: #默认即可
    addresses: localhost
    port: 5672
    username: guest #这里是默认账户,如果不是使用默认,在这里修改
    password: guest # 同上

队列

package com.congco.banamq.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * created on 19-6-19 下午2:25
 *
 * @author congco
 */
@Configuration
public class RabbitmqConfig {

    @Bean
    public Queue queue(){
        return new Queue("hello");
    }
}

发送者

package com.congco.banamq.demo;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

/**
 * created on 19-6-19 下午2:29
 *
 * @author congco
 */
@Component
public class HelloSend {

    private final AmqpTemplate amqpTemplate;

    public HelloSend(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }


    public void send(){
        String message = "hello"+ LocalDateTime.now();
        System.out.println("Send '"+message+"'");
        this.amqpTemplate.convertAndSend("hello",message);
    }
}

接收者

package com.congco.banamq.receive;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * created on 19-6-19 下午2:32
 *
 * @author congco
 */
@Component
@RabbitListener(queues = "hello")
public class HelloReceive {

    @RabbitHandler
    public void receive(String hello) {
        System.out.println("receive '" + hello + "'");
    }
}

Test

BaseTest.java

package com.congco.banamq.base;

import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * created on 19-6-19 下午2:36
 *
 * @author congco
 */

@RunWith(SpringRunner.class)
@SpringBootTest
public abstract class BaseTest {
}

RabbitMqTest.java

package com.congco.banamq;

import com.congco.banamq.base.BaseTest;
import com.congco.banamq.demo.HelloSend;

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * created on 19-6-19 下午2:34
 *
 * @author congco
 */
public class RabbitMqTest extends BaseTest {


    @Autowired
    private HelloSend helloSend;

    @Test
    public void testSend() {
        helloSend.send();
    }
}

以上是简单的使用

生产中使用

对象的发送和接收

可以在发送和接收中直接传递对象

TopicExchange

配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    
    //声明队列
    @Bean
    public Queue queue1() {
        return new Queue("hello.queue1", true); // true表示持久化该队列
    }
    
    @Bean
    public Queue queue2() {
        return new Queue("hello.queue2", true);
    }
    
    //声明交互器
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }

    //绑定
    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
    }
    //`#`匹配规则
    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
    }
}

Fanout Exchange

@Configuration
public class FanoutRabbitConfig {

    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

}

send

public void send() {
    String context = "hi, fanout msg ";
    System.out.println("Sender : " + context);
    this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}

result

Sender : hi, fanout msg 
...
fanout Receiver B: hi, fanout msg 
fanout Receiver A  : hi, fanout msg 
fanout Receiver C: hi, fanout msg 

参考地址

上一篇下一篇

猜你喜欢

热点阅读