Docker容器Amazing ArchDocker容器技术

docker环境下RabbitMQ实现延迟队列(使用delay插

2019-07-25  本文已影响3人  炒面Z

需求场景: 处理一个超时订单

以下解决方案是使用的rabbitMq是docker环境下部署
需要使用到rabbitMq的rabbitmq_delayed_message_exchange插件
另外( RocketMq自带18个级别的超时配置)
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 有兴趣的朋友们可以去试试

1. 进入docker容器内 docker exec  -t rabbit  bash
2. rabbitmq-plugins list 命令查看已安装插件
3. 在插件网址找到延迟插件的下载地址 http://www.rabbitmq.com/community-plugins.html 
4. exit 退出容器到宿主机中,下载插件: wget  https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
5. 解压 unzip XXX.zip -d . 
6. 拷贝至docker容器内: docker cp xxx.xz rabbit:/plugins
7. 再次进入docker容器内: 进入docker容器内 docker exec  -t rabbit  bash
8. 执行命令让插件生效: 启动延时插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
image.png
@Configuration
public class DelayedConfig {
    final static String QUEUE_NAME = "delayed.live.queue";
    final static String EXCHANGE_NAME = "delayed.live.exchange";

    @Bean
    public Queue queue() {
        return new Queue(DelayedConfig.QUEUE_NAME);
    }

    // 配置默认的交换机
    @Bean
    CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        //参数二为类型:必须是x-delayed-message
        return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    // 绑定队列到交换器
    @Bean
    Binding binding(Queue queue, CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
    }
}
@Slf4j
@Component
@RabbitListener(queues = DelayedConfig.QUEUE_NAME)
public class DelayedReceiver {
    @RabbitHandler
    public void process(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.info("接收时间:" + sdf.format(new Date()));
        log.info("消息内容:" + msg);
    }
}
@Slf4j
@Component
public class DelayedSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg,Integer delaySeconds) {
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.info("发送时间:" + sf.format(new Date()));

        rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay", delaySeconds * 1000);
                return message;
            }
        });
    }
}
@Slf4j
@RequestMapping("swapi")
@RestController
@Api(value = "SwTimetableLogApi", description = "mq测试", tags = {"mq测试"})
public class MqController {

    @Autowired
    private DelayedSender sender;

    @NoLogin
    @ApiOperation(value = "延迟队列测试")
    @PostMapping("/mq/{message}/{delay}")
    public Result messageWithMQ(@PathVariable(value = "message") String message,
                                @PathVariable(value = "delay") Integer delay) {
        log.info("Send: " + message);
        sender.send(message, delay);
        return Result.ok();
    }

}
上一篇下一篇

猜你喜欢

热点阅读