RabbitMQ/RPC/TTL/死信队列

2016-08-30  本文已影响2195人  米刀灵

当需要调用远端计算机的函数并等待结果,这模式通常被称为远程过程调用或RPC。

BasicProperties:
消息属性 这AMQP协议预先确定了消息中的14个属性。常用的有:

相关性ID(Correlation Id):
为每一个RPC请求创建一个回收队列,这个效率十分低下。每一个客户端创建一个单一的回收队列。 用Correlation Id判断队列中的响应是属于哪个请求的。
当在回收队列中接收消息时,依据这个属性值,刻意将每个响应匹配到对应的请求上。如果是未知的Correlation Id值,就丢弃这个消息,因为它不属于任何一个我们的请求。

请求RPC服务:

当客户端启动,它会创建一个匿名的独占的回收队列。 对于一个RPC请求,客户端会发送一个消息到rpc_queue队列中,其中有两个属性:replyTo(回复队列)和correlationId(每一个请求都是唯一值)。
服务端:
package testrabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) throws Exception {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }

    public static void main(String[] argv) throws Exception
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);//公平分发机制
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
        System.out.println(" [x] Awaiting RPC requests");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties
                    .Builder()
                    .correlationId(props.getCorrelationId())
                    .build();

            String message = new String(delivery.getBody());
            int n = Integer.parseInt(message);
            System.out.println(" [.] fib(" + message + ")");
            String response = "" + fib(n);
            channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

客户端:

package testrabbitmq;
import com.rabbitmq.client.*;
/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws Exception {
        String response = null;
        String corrId = java.util.UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes());

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody());
                break;
            }
        }

        return response;
    }

    public void close() throws Exception {
        connection.close();
    }
}

相关性ID(Correlation Id):
为每一个RPC请求创建一个回收队列,这个效率十分低下。每一个客户端创建一个单一的回收队列。 用Correlation Id判断队列中的响应是属于哪个请求的。
当在回收队列中接收消息时,依据这个属性值,刻意将每个响应匹配到对应的请求上。如果是未知的Correlation Id值,就丢弃这个消息,因为它不属于任何一个我们的请求。

请求RPC服务:

当客户端启动,它会创建一个匿名的独占的回收队列。 对于一个RPC请求,客户端会发送一个消息到rpc_queue队列中,其中有两个属性:replyTo(回复队列)和correlationId(每一个请求都是唯一值)。
服务端:
package testrabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;

/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCServer {

    private static final String RPC_QUEUE_NAME = "rpc_queue";

    private static int fib(int n) throws Exception {
        if (n == 0) return 0;
        if (n == 1) return 1;
        return fib(n-1) + fib(n-2);
    }

    public static void main(String[] argv) throws Exception
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);//公平分发机制
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
        System.out.println(" [x] Awaiting RPC requests");
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            BasicProperties props = delivery.getProperties();
            BasicProperties replyProps = new BasicProperties
                    .Builder()
                    .correlationId(props.getCorrelationId())
                    .build();

            String message = new String(delivery.getBody());
            int n = Integer.parseInt(message);
            System.out.println(" [.] fib(" + message + ")");
            String response = "" + fib(n);
            channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

客户端:

package testrabbitmq;
import com.rabbitmq.client.*;
/**
 * Created by zzhblh on 2016/8/29.
 */
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    private QueueingConsumer consumer;

    public RPCClient() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(replyQueueName, true, consumer);
    }

    public String call(String message) throws Exception {
        String response = null;
        String corrId = java.util.UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes());

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(delivery.getBody());
                break;
            }
        }

        return response;
    }

    public void close() throws Exception {
        connection.close();
    }
}

TTL

Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-message-ttl", 60000);//存活时间最大为 60 秒
channel.queueDeclare("myqueue", false, false, false, args);
AMQP.BasicProperties properties = new AMQP.BasicProperties();  
properties.setExpiration("60000");  
channel.basicPublish("myexchange", "routingkey", properties, message.getBytes());

对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去。而第二种方法里,即使消息过期,也不会马上从队列中抹去,在过期 message 到达 queue 的头部时被真正的丢弃。

Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-expires", 1800000);  
channel.queueDeclare("myqueue", false, false, false, args);  

DLX(死信)
利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有以下几种情况:

DLX也是一下正常的Exchange同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange中去,进而被路由到另一个队列, publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

channel.exchangeDeclare("some.exchange.name", "direct");  
Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-dead-letter-exchange", "some.exchange.name");  
channel.queueDeclare("myqueue", false, false, false, args); 

你也可以为这个DLX指定routing key,如果没有特殊指定,则使用原队列的routing key

args.put("x-dead-letter-routing-key", "some-routing-key"); 

http://memorynotfound.com/produce-consume-rabbitmq-spring-json-message-queue/
http://blog.csdn.net/jiao_fuyou/article/details/22923935

上一篇 下一篇

猜你喜欢

热点阅读