rabbitmqspringcloud

RabbitMQ+spring boot 快速启动

2019-01-22  本文已影响3人  HmilyMing

流程: 首先是获取连接工厂 ConnectionFactory --> 获取一个连接 Connection --> 通过连接建立数据通信 信道 Channel,用 Channel 发送或接收消息。

代码地址:

https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 项目下的 quickstart 包下

maven:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.15.RELEASE</version>
        <!--<version>2.1.0.RELEASE</version>-->
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.5</version>
        </dependency>
        
         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.6</version>
        </dependency>
    </dependencies>

定义和赋值 RabbitMQ 的配置

public interface RabbitMQCommon {

    final static String RABBITMQ_HOST = "192.168.0.7";
    final static int RABBITMQ_PORT = 5672;
    final static String RABBITMQ_DEFAULT_VIRTUAL_HOST = "/";

    public final static String RABBITMQ_USERNAME = "guest";
    public final static String RABBITMQ_PASSWORD = "guest";
}

消费端代码:

/**
 * 快速开始:消费者
 */
@Slf4j
public class Consumer {
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
//          queueName, durable, exclusive, autoDelete, arguments
        channel.queueDeclare("test1001", true, false, false, null);
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//         queueName,  autoAck, Consumer callback
        channel.basicConsume("test1001", true, queueingConsumer);
        log.info("消费端启动啦~");
        while (true) {
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            log.info("消费端接收到:{}", msg);
        }
    }

}

生产端代码:

/**
 * 快速开始:生产者
 */
@Slf4j
public class Procuder {

    private static final Logger log = LoggerFactory.getLogger(Procuder.class);
    
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        for (int i = 0; i < 5; i++) {
            String msg = "hello RabbitMQ + " + i;
            log.info("生产者发送消息:{}", msg);
            channel.basicPublish("", "test1001", null, msg.getBytes());
        }
        log.info("生产者发送消息完毕");
        channel.close();
        connection.close();
    }
}

run运行消费端的代码

image

打开管控台,看到这个队列创建成功了

image image

运行生产端的代码,看到如下日志

image

点击这个queue进去查看到刚才有消息发送过来了

image

在idea查看消费端的日志

image

刚才生产端发送的消息已被消费端消费,至此,快速启动demo已完毕

上一篇 下一篇

猜你喜欢

热点阅读