消息中间件

用rebbitMq来实现你的延迟队列功能

2016-07-12  本文已影响1420人  jsondream

延迟队列

在我们的上一篇文章使用delayedQueue实现你本地的延迟队列
中提到了延迟队列的作用.

但是我们知道,利用delayedQueue实现的是一个单机的,而且是内存中的延迟队列,他并没有一个集群的支持,并且需要在对泵机的时候,消息消费异常的时候做相应的逻辑处理。

那么这样做的话,我们需要的工作量还是很大的,有没有什么东西是让我们不做这一部分的工作也能实现延迟队列的功能?

当然有了。答案是:rabbitMq

利用rabbitMq来实现延迟队列的功能

那么如何利用rabbitMq来实现延迟队列的功能呢?

请先注意一点,RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。那么这是通过哪些特性呢,那就让我们来认识一下这两个特性吧.

结合以上两个特性,就可以模拟出延迟消息的功能.

基于x-dead-letter-routing-key的单条消息延迟队列的java代码实现

生产者(发送)端代码:


import java.util.HashMap;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    //队列名称  
    private final static String QUEUE_NAME = "hello";  
  
    public static void main(String[] argv) throws Exception  
    {  
        
        /** 
         * 创建连接连接到MabbitMQ 
         */  
        ConnectionFactory factory = new ConnectionFactory();  
        //设置MabbitMQ所在主机ip或者主机名  
        factory.setHost("localhost");  
        //创建一个连接  
        Connection connection = factory.newConnection();
        //创建一个频道  
        Channel channel = connection.createChannel();  
        
        
 
        
        //指定一个队列  
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
        //发送的消息  
        String message = "hello world!"+System.currentTimeMillis();;  

        AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();  
        AMQP.BasicProperties properties = builder.expiration("2000").deliveryMode(2).build();  
        //往队列中发出一条消息     这时候要发送的队列不应该是QUEUE_NAME,这样才能进行转发的
        channel.basicPublish("", "DELAY_QUEUE", properties, message.getBytes());  
        System.out.println(" [x] Sent '" + message + "'" );  
        //关闭频道和连接  
        channel.close();  
        connection.close();  
     }  
}

消费者(接受)端代码:


import java.util.HashMap;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {
    // 队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
        // channel.queueBind(QUEUE_NAME, "amq.direct", QUEUE_NAME);

        HashMap<String, Object> arguments = new HashMap<String, Object>();  
        arguments.put("x-dead-letter-exchange", "amq.direct");  
        arguments.put("x-dead-letter-routing-key", QUEUE_NAME);  
        channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments); 
        
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。  
        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 
      //创建队列消费者  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
        //指定消费队列  
        channel.basicConsume(QUEUE_NAME, true, consumer);  
        while (true)  
        {  
            //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)  
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
            String message = new String(delivery.getBody());  
            System.out.println(" [x] Received '" + message + "'"+ "'   [当前系统时间戳]" +System.currentTimeMillis());  
        }  
    }
}

参考资料

http://www.rabbitmq.com/ttl.html
http://www.rabbitmq.com/dlx.html
https://www.cloudamqp.com/docs/delayed-messages.html
http://www.netfoucs.com/article/xtjsxtj/73636.html#

上一篇下一篇

猜你喜欢

热点阅读