当RabbitMQ遇到springboot

2019-11-10  本文已影响0人  lemon0927
把MQ集成到springboot里面玩玩

先来jar

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

先来个简单的玩玩
做个连接工具类

/**
     * 获取MQ的连接
     * @return
     */
    public static Connection getConnection() throws IOException, TimeoutException {

        // 定义一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置服务地址
        factory.setHost("localhost");
        // 设置AMOP的端口号5672
        factory.setPort(5672);
        // 设置vhost
        factory.setVirtualHost("/vhost_lemon");
        // 用户名、密码
        factory.setUsername("lemon");
        factory.setPassword("lemon0927");
        // 获取连接
        Connection connection = factory.newConnection();

        return connection;
    }

来个生产者接口

@GetMapping("/send")
    public void send(String str) throws IOException, TimeoutException {

        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //创建队列申明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发布消息
        channel.basicPublish("", QUEUE_NAME, null, (str).getBytes());
        //关闭连接
        channel.close();
        connection.close();

    }

再来个消费者接口

@GetMapping("/get")
    public void get() throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //获取通道
        Channel channel = connection.createChannel();
        //队列申明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //处理消息
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body){
                try {
                    System.out.println(new String(body,"utf-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
是不是很简单啊

完美结束~
NO!NO!NO!不用注解实现算什么程序员
先来一组配置信息

# ==================== rabbitmq ===========
# rabbitmq 主机地址
spring.rabbitmq.host=127.0.0.1
# rabbitmq 主机端口号
spring.rabbitmq.port=5672
# rabbitmq 用户名
spring.rabbitmq.username=lemon
# rabbitmq 密码
spring.rabbitmq.password=lemon0927
# rabbitmq 虚拟主机
spring.rabbitmq.virtual-host=/vhost_lemon

指定一个队列

package com.lemon.boot.rabbit;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class SenderConfig {

    @Bean
    public Queue queue(){
        return new Queue("lemon-queue");
    }
}

生产者

@Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(){
        while (true){
            String msg = "hello"+new Date();
            this.rabbitTemplate.convertAndSend("lemon-queue", msg);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

消费者

@RabbitListener(queues = "lemon-queue")
    public void process(String msg){
        System.out.println("reveiver:"+msg);
    }

这样是不是很简单啊,我们只是在启动的时候向容器里注册了一个队列,然后使用AmqpTemplate的convertAndSend方法就可以往指定队列里生产消息,然后再使用@RabbitListener就可以监听到指定队列,那么它背后是怎么实现与mq服务器的连接以及监听的呢?以后再补充吧

上一篇下一篇

猜你喜欢

热点阅读