RabbitMQ 在Spring Boot2.*中的使用
2019-06-19 本文已影响0人
CongCo
rabbit
docker地址
介绍
RabbitMQ is the most widely deployed open source message broker
官网地址链接
RabbitMQ 是开源消息代理软件(有时称为面向消息的中间件),用于实现高级消息队列协议 (AMQP)。RabaMQ 服务器以 Erlang 编程语言编写,并基于用于群集和故障转移的开放电信平台框架。与代理接口的客户端库可用于所有主要编程语言。
安装方式
使用docker 安装
docker地址
docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 rabbitmq:3-management
然后,您可以在浏览器中http://localhost:8080
转到http://localhost:8080http://host-ip:8080
或http://host-ip:8080
spring boot2
在pom中引入jar
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件application.yml
server:
port: 8081 # 8080已被占用
spring:
application:
name: bana-mq
rabbitmq: #默认即可
addresses: localhost
port: 5672
username: guest #这里是默认账户,如果不是使用默认,在这里修改
password: guest # 同上
队列
package com.congco.banamq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* created on 19-6-19 下午2:25
*
* @author congco
*/
@Configuration
public class RabbitmqConfig {
@Bean
public Queue queue(){
return new Queue("hello");
}
}
发送者
package com.congco.banamq.demo;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* created on 19-6-19 下午2:29
*
* @author congco
*/
@Component
public class HelloSend {
private final AmqpTemplate amqpTemplate;
public HelloSend(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}
public void send(){
String message = "hello"+ LocalDateTime.now();
System.out.println("Send '"+message+"'");
this.amqpTemplate.convertAndSend("hello",message);
}
}
接收者
package com.congco.banamq.receive;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* created on 19-6-19 下午2:32
*
* @author congco
*/
@Component
@RabbitListener(queues = "hello")
public class HelloReceive {
@RabbitHandler
public void receive(String hello) {
System.out.println("receive '" + hello + "'");
}
}
Test
BaseTest.java
package com.congco.banamq.base;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* created on 19-6-19 下午2:36
*
* @author congco
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public abstract class BaseTest {
}
RabbitMqTest.java
package com.congco.banamq;
import com.congco.banamq.base.BaseTest;
import com.congco.banamq.demo.HelloSend;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
/**
* created on 19-6-19 下午2:34
*
* @author congco
*/
public class RabbitMqTest extends BaseTest {
@Autowired
private HelloSend helloSend;
@Test
public void testSend() {
helloSend.send();
}
}
以上是简单的使用
生产中使用
对象的发送和接收
可以在发送和接收中直接传递对象
TopicExchange
配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
//声明队列
@Bean
public Queue queue1() {
return new Queue("hello.queue1", true); // true表示持久化该队列
}
@Bean
public Queue queue2() {
return new Queue("hello.queue2", true);
}
//声明交互器
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
//绑定
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
}
//`#`匹配规则
@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
}
}
Fanout Exchange
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
send
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
result
Sender : hi, fanout msg
...
fanout Receiver B: hi, fanout msg
fanout Receiver A : hi, fanout msg
fanout Receiver C: hi, fanout msg