当RabbitMQ遇到springboot
2019-11-10 本文已影响0人
lemon0927
把MQ集成到springboot里面玩玩
先来jar
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
先来个简单的玩玩
做个连接工具类
/**
* 获取MQ的连接
* @return
*/
public static Connection getConnection() throws IOException, TimeoutException {
// 定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost("localhost");
// 设置AMOP的端口号5672
factory.setPort(5672);
// 设置vhost
factory.setVirtualHost("/vhost_lemon");
// 用户名、密码
factory.setUsername("lemon");
factory.setPassword("lemon0927");
// 获取连接
Connection connection = factory.newConnection();
return connection;
}
来个生产者接口
@GetMapping("/send")
public void send(String str) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//创建队列申明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发布消息
channel.basicPublish("", QUEUE_NAME, null, (str).getBytes());
//关闭连接
channel.close();
connection.close();
}
再来个消费者接口
@GetMapping("/get")
public void get() throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//队列申明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//处理消息
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
try {
System.out.println(new String(body,"utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
是不是很简单啊
完美结束~
NO!NO!NO!不用注解实现算什么程序员
先来一组配置信息
# ==================== rabbitmq ===========
# rabbitmq 主机地址
spring.rabbitmq.host=127.0.0.1
# rabbitmq 主机端口号
spring.rabbitmq.port=5672
# rabbitmq 用户名
spring.rabbitmq.username=lemon
# rabbitmq 密码
spring.rabbitmq.password=lemon0927
# rabbitmq 虚拟主机
spring.rabbitmq.virtual-host=/vhost_lemon
指定一个队列
package com.lemon.boot.rabbit;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SenderConfig {
@Bean
public Queue queue(){
return new Queue("lemon-queue");
}
}
生产者
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(){
while (true){
String msg = "hello"+new Date();
this.rabbitTemplate.convertAndSend("lemon-queue", msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者
@RabbitListener(queues = "lemon-queue")
public void process(String msg){
System.out.println("reveiver:"+msg);
}
这样是不是很简单啊,我们只是在启动的时候向容器里注册了一个队列,然后使用AmqpTemplate的convertAndSend方法就可以往指定队列里生产消息,然后再使用@RabbitListener就可以监听到指定队列,那么它背后是怎么实现与mq服务器的连接以及监听的呢?以后再补充吧