dockerRabbitMQSpring

RabbitMQ实现延迟消息(rabbitmq_delayed_

2020-09-23  本文已影响0人  AbstractCulture

下载延迟消息插件

Warning:请检查你的RabbitMQ版本与插件兼容是否一致

官方下载地址

下载完后,将插件上传到服务器
我上传的地址是: /usr/etc/rabbitmq_plugins

在Docker环境下,安装延迟消息插件

侵入容器找到plugins目录

> docker exec -it rabbitmq bash
## 可以看到,plugins就是存放 mq 插件的地方了
> ls 

将插件复制到plugins目录下

> cd /usr/etc/rabbitmq_plugins
> docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins

回到plugins目录,查看plugins中是否有rabbitmq_delayed_message_exchange插件

image.png

激活插件

> rabbitmq-plugins enable rabbitmq_delayed_message_exchange
image.png

重启RabbitMQ

> docker restart rabbitmq

进入RabbitMQ管理界面查看插件是否成功生效

image.png

OK,完成以上工作,就可以编写Java代码发送延迟消息了。

SpringBoot中发送延迟消息

Config

package com.xjm.mid.compent.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author jaymin
 * 2020/09/22
 */
@Configuration
public class RabbitMQDelayedMessageConfig {
    /**
     * 延迟消息交换机
     */
    public final static String DELAY_EXCHANGE = "jaymin.delay.exchange";
    /**
     * 队列
     */
    public final static String DELAY_QUEUE = "jaymin.delay.queue";
    /**
     * 路由Key
     */
    public final static String DELAY_ROUTING_KEY = "jaymin.delay.routingKey";

    @Bean
    public CustomExchange delayMessageExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //自定义交换机
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", false, false, args);
    }

    @Bean
    public Queue delayMessageQueue() {
        return new Queue(DELAY_QUEUE, false, false, false);
    }

    @Bean
    public Binding bindingDelayExchangeAndQueue() {
        return BindingBuilder.bind(delayMessageQueue()).to(delayMessageExchange()).with(DELAY_ROUTING_KEY).noargs();
    }
}

Client

package com.xjm.mid.compent.rabbitmq.web;


import com.xjm.mid.compent.rabbitmq.config.RabbitMQDelayedMessageConfig;
import com.xjm.mid.compent.rabbitmq.model.Letter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

/**
 * @author jaymin
 * 2020/09/22
 */
@RestController
@RequestMapping("/message/delayed")
@Slf4j
public class DelayedMessageClient {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/ten")
    public void sendDelayedMessage1(){
        Letter letter = new Letter();
        letter.setRecipient("福尔摩斯");
        letter.setContext("您的10S外卖到了!");
        Integer ttl = 10000;
        rabbitTemplate.convertAndSend(RabbitMQDelayedMessageConfig.DELAY_EXCHANGE, RabbitMQDelayedMessageConfig.DELAY_ROUTING_KEY, letter, message -> {
            // 设置过期时间
            message.getMessageProperties().setDelay(ttl);
            return message;
        });
        log.info("[发送时间] - [{}]-[过期时间]-[{}]", LocalDateTime.now(),ttl/1000);
    }


    @PostMapping("/five")
    public void sendDelayedMessage2(){
        Letter letter = new Letter();
        letter.setRecipient("福尔摩斯");
        letter.setContext("您的5S外卖到了!");
        Integer ttl = 5000;
        rabbitTemplate.convertAndSend(RabbitMQDelayedMessageConfig.DELAY_EXCHANGE, RabbitMQDelayedMessageConfig.DELAY_ROUTING_KEY, letter, message -> {
            // 设置过期时间
            message.getMessageProperties().setDelay(ttl);
            return message;
        });
        log.info("[发送时间] - [{}]-[过期时间]-[{}]", LocalDateTime.now(),ttl/1000);
    }
}

Listener

package com.xjm.mid.compent.rabbitmq.web;

import com.rabbitmq.client.Channel;
import com.xjm.mid.compent.rabbitmq.config.RabbitMQDelayedMessageConfig;
import com.xjm.mid.compent.rabbitmq.model.Letter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


import java.io.IOException;
import java.time.LocalDateTime;

/**
 * @author jaymin
 * 2020/09/22
 */
@Component
@Slf4j
public class DelayMessageListener {

    @RabbitListener(queues = {RabbitMQDelayedMessageConfig.DELAY_QUEUE})
    @RabbitHandler
    public void receiveMessage(Channel channel, Message message, Letter letter) {
        log.info("[listenerDelayQueue 监听的消息] - [消费时间] - [{}] - [{}]", LocalDateTime.now(), letter.toString());
    }
}

Result

image.png

rabbitmq 的延时插件极限时间是 8byte 长度 ms,大概 49天。如果你的延时时间很长,建议配合定时任务进行处理。

上一篇 下一篇

猜你喜欢

热点阅读