Spring Boot集成RabbitMQ-简单示例

2020-07-23  本文已影响0人  alex很累

示例代码地址:
https://github.com/sushizhendeqiang/springboot-rabbitmq-demo

一、添加依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二、配置文件添加配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 10000
    listener:
      simple:
        # 手动应答
        acknowledge-mode: manual 
        auto-startup: true
    # 不重回队列
        default-requeue-rejected: false 
        concurrency: 5
        max-concurrency: 20
        # 每次只处理一个信息
    prefetch: 1 
        retry:
          enabled: false

三、生产者、消费者

生产者

package com.rabbitmq.demo.demo;

import com.rabbitmq.demo.entity.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
 * @Author: sush4
 * @Description:
 * @Date: 2020/7/18
 */

@Component
public class RabbitProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    /**
     * 发送消息
     */
    @SendTo
    public void sendMessage() {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //发送简单消息
            IntStream.rangeClosed(1, 5).forEach(num -> {
                String body = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " : " + num;
                MessageProperties properties = new MessageProperties();
                //消息内容的编码格式
                properties.setContentEncoding("UTF-8");
                //Delivery mode: 是否持久化,1 - Non-persistent,2 - Persistent
                properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                Message message = new Message(body.getBytes(Charset.forName(properties.getContentEncoding())), properties);
                amqpTemplate.convertAndSend("rabbit-springboot-exchange", "rabbitmq-demo-routingkey", message);
            });
            // 发送java bean 消息
            // 实体要序列化 否则 会发送失败
            IntStream.rangeClosed(1, 5).forEach(num -> {
                //这里的builder是lombok快速构建实体的一个方法
                User user = User.builder().userId(num).username("zhangsan:" + num).password("666666").build();
                //convertAndSend方法参数说明:
                //参数1:exchange     交换机名称
                //参数2:routingKey   绑定关系,通过绑定关系,将exchage交换机绑定到queue队列
                amqpTemplate.convertAndSend("rabbit-springboot-exchange", "rabbitmq-demo-routingkey-bean", user);
            });
        }).start();
    }
}

生产者其实没什么好说的,一个convertAndSend完事~
要注意的是:如果想直接发送的实体的话,实体要序列化(Serializable)

消费者

package com.rabbitmq.demo.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.demo.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

/**
 * @Author: sush4
 * @Description:
 * @Date: 2020/7/18
 */

@Component
@Slf4j
public class RabbitReceiver {
    @RabbitHandler
    @RabbitListener(queues = "rabbitmq-demo")
    public void receiveMessage(Message message, Channel channel) throws UnsupportedEncodingException {
        String encoding = message.getMessageProperties().getContentEncoding();
        log.info("接收到string消息:[{}]", new String(message.getBody(), "UTF-8"));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    @RabbitHandler
    @RabbitListener(queues = "rabbitmq-demo-bean")
    public void receiveMessage(User user, Message message, Channel channel) {
        log.info("接收到bean消息:[{}]", user);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }
}

消费者关键是两个注释:
@RabbitListener:监听队列中的消息
@RabbitHandler:和@RabbitListener配合使用,@RabbitListener监听到消息,交由@RabbitHandler标注的方法处理

这里我贴出来的示例可能不能明确感受到这两个注解的作用,可以看下图:
具体运行哪个方法,要看message的参数类型

image.png
图片来源:https://www.jianshu.com/p/911d987b5f11

另外,关于@RabbitListener的使用,我这里只指明了queue队列,如果在rabbitmq中不对该队列进行创建配置的话,是不会监听到信息的(队列都没有,监听了毛线);
这里还有另外一种写法:

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "rabbitmq-demo", durable = "true"),
            exchange = @Exchange(name = "rabbit-springboot-exchange", durable = "true", type = "topic"),
            key = "rabbitmq-demo-routingkey"
    ))

这么写的话,不存在的情况下,会自动创建~
Queue、Exchange、routingKey就不用赘述了,分别是队列、交换机、绑定关系;
durable这个字段表明是否持久化。


这篇文章只介绍了rabbitmq的简单使用~
有理解不正确的地方,望指正!

上一篇下一篇

猜你喜欢

热点阅读