2019-08-26/SpringBoot 之RabbitMQ
2019-08-26 本文已影响0人
呼噜噜睡
rabbitmq似乎用的挺多的,在消息可靠性要求比较高的项目上,首选它。为什么是它,我也不太清楚,如果有知道的小伙伴,要告诉我哟~
好,进入正题,首选安装rabbitmq,这就能够折腾你半天的。一般有三种:
一、linux或者linux虚拟机上安装rabbitmq,那么首先你要安装erlang。对于下载,安装,配置,让你怀疑人生。具体怎么怀疑人生,百度吧。
二、使用docker安装。windows7 使用docker toolbox 安装docker,如果你能顺利安装docker,后面畅通无阻。难就难在顺利的安装docker。如果你是win10 旗舰版,那么在docker官网下载docker安装包,妥妥的搞定,爽哉。如果你是win10家庭版,对不起,我没有听清,请再说一遍。
三、使用windows版的rabbitmq。具体参考https://www.cnblogs.com/ericli-ericli/p/5902270.html
好的,经过愉快的安装之旅,我们假设你已经成功安装了RabbitMQ了。接下来搭建一个springboot项目,不懂的同学可以上spring官网。引入rabbitmq pom依赖,完整pom如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.wjp</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties的配置:
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=wjp
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
#producer config
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
#consumer config
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=1
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=1
SpringBoot提供了一个现成的RabbitTemplate bean供我们使用,,发送方代码:
package com.example.demo.amqp;
import java.util.Map;
import java.util.UUID;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.IdGenerator;
@Component
public class RabbitSender {
//自动注入RabbitTemplate模板类
@Autowired
private RabbitTemplate rabbitTemplate;
//回调函数: confirm确认
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//System.err.println("生产端---correlationData: " + correlationData);
//System.err.println("生产端---ack: " + ack);
if(!ack){
System.err.println("生产端---异常处理....");
}
}
};
//回调函数: return返回
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("生产端---return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
//发送消息方法调用: 构建Message消息
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局唯一
//CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()+"---------");
rabbitTemplate.convertAndSend("", "test_queue", msg);
}
}
接手方代码:
package com.example.demo.amqp;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class RabbitReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "test_queue",durable=true),
exchange = @Exchange(value = "amq.direct",
ignoreDeclarationExceptions = "true"),
key = "test_queue"
)
)
@RabbitHandler
public void onOrderMessage(@Payload String msg,Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("消费端----------------------------: " + msg);
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//System.out.println(deliveryTag);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
直接在启动类上进行测试:
package com.example.demo;
import com.example.demo.amqp.RabbitSender;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) throws Exception{
ConfigurableApplicationContext configurableApplicationContext =SpringApplication.run(DemoApplication.class, args);
RabbitSender rabbitSender = configurableApplicationContext.getBean(RabbitSender.class);
for(int i=0;i<100;i++){
rabbitSender.send("生产端---发送的消息--"+i,null);
Thread.sleep(500);
//System.out.println(i);
}
}
}