rabbitmq延时任务(插件实现,过期时间不会堵塞)

2019-05-31  本文已影响0人  寂静的春天1988

使用插件模式实现延时任务
rabbitmqConfig类:建立路由和队列并且绑定

package com.plugins.config;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @title rabbitmq配置类
 * @author yl
 * @time 2019年5月9日
 * @Description
 */
@Configuration
public class RabbitConfig {

    // 拼团超时延时交换机
    public static final String DELAY_GROUP_ACTIVITTY_EXCHANGE = "DELAY_GROUP_ACTIVITTY_EXCHANGE";

    // 拼团超时队列
    public static final String DELAY_GROUP_ACTIVITTY_QUEUE = "DELAY_GROUP_ACTIVITTY_QUEUE";

    // 拼团路由键
    public static final String DELAY_GROUP_ACTIVITTY_ROUTING_KEY = "DELAY_GROUP_ACTIVITTY_ROUTING_KEY";

    // 创建一个超时消费队列
    @Bean
    public Queue delayGroupActivityQueue() {
        // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
        return new Queue(DELAY_GROUP_ACTIVITTY_QUEUE, true);
    }

    /**
     * 死信交换机
     * 
     * @return
     */
    @Bean
    public CustomExchange delayGroupActivityExchange() {
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_GROUP_ACTIVITTY_EXCHANGE, "x-delayed-message", true, false, args);
    }

    /**
     * 将交换机和队列绑定
     * 
     * @return
     */
    @Bean
    public Binding bindingNotify() {
        return BindingBuilder.bind(delayGroupActivityQueue()).to(delayGroupActivityExchange())
                .with(DELAY_GROUP_ACTIVITTY_ROUTING_KEY).noargs();
    }
}

Receiver:消费者类

package com.plugins.config;

import java.io.IOException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component
public class Receiver {
    
    @RabbitListener(queues = RabbitConfig.DELAY_GROUP_ACTIVITTY_QUEUE)
    public void get(Message message, Channel channel) throws IOException {
        String msg=new String(message.getBody(),"utf-8");
        System.out.println(message.getBody().toString());
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

sender:生产者类

package com.plugins.config;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class Sender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    
    /**
     * 通过延迟消息插件发动延迟消息
     * @param msg
     * @param expiration
     */
    public void sendDelayMessageByPlugins(Long groupActivityId,Long expiration){
         //消息发送失败返回到队列中, yml需要配置 publisher-returns: true
//      rabbitTemplate.setMandatory(true);
//        // 消息返回, yml需要配置 publisher-returns: true
//      rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
//            String correlationId = message.getMessageProperties().getCorrelationId();
//            System.out.println("消息发送失败"+replyText+exchange+routingKey);
//        });

        //绑定异步监听回调函数
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
               System.out.println("成功");
            } else {
                System.out.println("失败");
            }
        });
        rabbitTemplate.convertAndSend(RabbitConfig.DELAY_GROUP_ACTIVITTY_EXCHANGE,RabbitConfig.DELAY_GROUP_ACTIVITTY_ROUTING_KEY, groupActivityId,(message)->{
            System.out.println("过期时间"+expiration); 
            message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间
             return message;
        });
        
    }
}

生产者类注释掉的代码因为即使消息发送成功也会触发setReturnCallback(消息未到队列时,触发该方法)这个方法,不知道怎么解决这个问题?而使用死信队列的方式不会有该问题。

上一篇 下一篇

猜你喜欢

热点阅读