docker环境下RabbitMQ实现延迟队列(使用delay插
2019-07-25 本文已影响3人
炒面Z
需求场景: 处理一个超时订单
以下解决方案是使用的rabbitMq是docker环境下部署
需要使用到rabbitMq的rabbitmq_delayed_message_exchange插件
另外( RocketMq自带18个级别的超时配置)
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 有兴趣的朋友们可以去试试
-
在docker环境下的rabbitMq中安装延迟插件
1. 进入docker容器内 docker exec -t rabbit bash
2. rabbitmq-plugins list 命令查看已安装插件
3. 在插件网址找到延迟插件的下载地址 http://www.rabbitmq.com/community-plugins.html
4. exit 退出容器到宿主机中,下载插件: wget https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
5. 解压 unzip XXX.zip -d .
6. 拷贝至docker容器内: docker cp xxx.xz rabbit:/plugins
7. 再次进入docker容器内: 进入docker容器内 docker exec -t rabbit bash
8. 执行命令让插件生效: 启动延时插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-
java项目中的应用
-
DelayedConfig.java
@Configuration
public class DelayedConfig {
final static String QUEUE_NAME = "delayed.live.queue";
final static String EXCHANGE_NAME = "delayed.live.exchange";
@Bean
public Queue queue() {
return new Queue(DelayedConfig.QUEUE_NAME);
}
// 配置默认的交换机
@Bean
CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
//参数二为类型:必须是x-delayed-message
return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
// 绑定队列到交换器
@Bean
Binding binding(Queue queue, CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
}
}
-
DelayedReceiver
@Slf4j
@Component
@RabbitListener(queues = DelayedConfig.QUEUE_NAME)
public class DelayedReceiver {
@RabbitHandler
public void process(String msg) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("接收时间:" + sdf.format(new Date()));
log.info("消息内容:" + msg);
}
}
-
DelayedSender
@Slf4j
@Component
public class DelayedSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg,Integer delaySeconds) {
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("发送时间:" + sf.format(new Date()));
rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setHeader("x-delay", delaySeconds * 1000);
return message;
}
});
}
}
-
测试控制器
@Slf4j
@RequestMapping("swapi")
@RestController
@Api(value = "SwTimetableLogApi", description = "mq测试", tags = {"mq测试"})
public class MqController {
@Autowired
private DelayedSender sender;
@NoLogin
@ApiOperation(value = "延迟队列测试")
@PostMapping("/mq/{message}/{delay}")
public Result messageWithMQ(@PathVariable(value = "message") String message,
@PathVariable(value = "delay") Integer delay) {
log.info("Send: " + message);
sender.send(message, delay);
return Result.ok();
}
}