4.发送和接受消息

2019-08-09  本文已影响0人  JiangCheng97

文章参考:Rabbit实战指南

发送消息

​ 如果要发送一个消息,可以使用Channel类的basicPublish方法,比如发送一条内容为“Hello World”的消息,参考如下:

byte[] messageBodyBytes = "Hello,world!".getBytes();
channel.basicPublish(exchangeName,routingKey,null,messageBodyBytes);

为了更好地可控制发送,可以使用mandatory这个参数,或者可以发送一些特定属性的信息:

channel.basicPublish(exchangeName,routingKey,mandatory,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     messageBodyBytes);

下面这行代码发送了一条消息,这条消息的投递模式(delivery mode)设置为2,即消息会被持久化(存入磁盘中)在服务器中。同时这条消息的优先级(priority)设置为1,content-type为“text/plain”。可以自己设定消息的属性:

channel.basicPublish(exchangeName,routingKey,
                     new AMQP.BasicProperties.Builder()
                     .contentType("text/plain")
                     .deliveryMode(2)
                     .priority(1)
                     .userId("hidden")
                     .build(),
                     messageBodyBytes
                    );

也可以发送一条带有headers的消息:

Map<String,Object> headers = new HashMap<>();
headers.put("localtion","here");
headers.put("time","today");
channel.basicPublish(exchangeName,routingKey,
                    new AMQP.BasicProperties.Builder()
                    .headers(headers)
                     .build(),
                     messageBodyBytes
                    );

也可以发送一条带有过期时间(expiration)的消息

channel.basicPublish(exchangeName,routingKey,
                     new AMQP.BasicProperties.Builder()
                     .expiration("6000")
                     .build(),
                     messageBodyBytes
                    );

basicPublish的重载方法:

void basicPublish(String exchange,String routingKey,
                  BasicProperties props,
                  byte[] body) throws IOException;

void basicPublish(String exchange,String routingKey,
                  boolean mandatory,
                  BasicProperties props,
                  byte[] body) throws IOException;

void basicPublish(String exchange,String routingKey,
                  boolean mandatory,boolean imediate,
                  BasicProperties props,
                  byte[] body) throws IOException;

具体参数解释如下:

消费消息

RabbitMQ的消费模式分为两种:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume进行消费,而拉模式则调用Basic.Get进行消费。

推模式

在推模式中,可以通过持续订阅的方式来消费信息,使用到的相关类有:

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DeafultConsumer;

接收消息一般通过实现Consumer接口或者继承DefaultConsumer类来实现。当调用与Consumer类来实现。当调用与Consumer相关的API方法,不同的订阅采用不同的消费者标签来区分彼此,在同一个Channel中的消费者也需要通过唯一的消费者标签以作区别。代码如下:

boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName,autoAck,"myConsumerTag",
        new DefaultConsumer(channel){
            @Override
            public void handleDevlivery(String consumerTag,
                                       Envelop envelope,
                                  AMQP.BasicProperties properties,
                                        byte[] bode
                                       )throws IOException
            {
                String routingKey = envelope.getRoutingKey();
                String contentType = properties.getContentType();
                long deliveryTag = envelope.getDeliveryTag();
                //(process the message components here ...)
                channel.basicAck(deliveryTag,false);
            }
                    }
                    )

上面代码显式地设置autoAck为false,然后在接受到消息之后进行显式ack操作(channel.basicAck),对应消费者来说这个设置是非常必要的,可以防止消息不必要的丢失。

​ Channel类中basicConsume方法有如下几种形式:

  1. String basicConsume(String queue,Consumer callback) throws IOException

  2. String basicConsume(String queue,boolean autoAck,Consumer callback) throws IOException

  3. String basicConsume(String queue,boolean autoAck,Map<String,Object> arguments,Consumer callback) throws IOException

  4. String basicConsume(String queue,boolean autoAck,String consumerTag,Consumer callback) throws IOException

  5. String basicConsume(String queue,boolean autoAck,String consumerTag,boolean noLocal,boolean exclusive,Map<String,Object> arguments,Consumer callback) throws IOException

    对应参数说明:

    • queue:队列的名称
    • autoAck:设置是否自动确认。建议设置为false
    • consumerTag:消费者标签,用来区分多个消费者;
    • noLocal:设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
    • exclusive:设置是否排他;
    • arguments:设置消费者的其他参数
    • callback:设置消费者的回调函数。用来处理RabbitMQ推送过来的消息,比如DefaultConsumer,使用时需要客户端重写其中的方法。

    对于消费者客户端来说,重写handleDelivery方法是十分方便的。更复杂的消费者客户端会重写更多的方法,具体如下:

    void handleConsumeOk(String consumerTag);
    void handleCancelOk(String consumerTag);
    void handleCancel(String consumerTag) throws IOException;
    void handleShutdownSignal(String consumerTag,ShutdowSignalException sig);
    void handleRecoverOk(String consumerTag);
    

    比如handleShutdownSignal方法,当Channel或者Connection关闭的时候回调用。再者,handleConsumeOk方法会在其他方法之前调用,返回消费者标签。

    重写handleCancelOk和handleCancel方法,这样消费端可以再显示地或者隐式地取消订阅的时候调用。也可以通过channel.basicCancel方法来显式地取消一个消费者的订阅:

    channel.basicCancel(consumerTag)

    注意上面这行代码会首先触发handleConsumerOk方法,之后触发handleDelivery方法,最后才触发handleCanceOk方法。

 和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些callback会被分配到与Channel不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,如channe.queueDeclare、channel.basicCancel等。

 每个Channel都拥有自己独立的线程。最常用的做法是一个Channel对应一个消费者。意味着消费者彼此之间没有任何关联。也可以在一个Channel中维持多个消费者,但是要注意一个问题,如果Channel中的一个消费者一直在运行,那么其他消费者的callback会被“耽搁”。

拉模式

通过channel.basicGet方法可以单条的获取消息,其返回值是GetResponse。Channel类的basicGet方法没有其他重载方法,只有:

 GetResponse basicGet(String queue,boolean autoAck) throws IOException;

其中queue代表队列名称,如果设置autoAck为false,那么同样需要调用channel.basicAck来确认消息被成功接收。

拉模式的关键代码如下所示:

GetResponse response = channel.basicGet(QUEUE_NAME,false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);
注意:

Basic.Consume将信道(Channel)置为接收模式,知道取消队列的订阅为止。在接收模式期间,RabbitMQ会不断地推送消息给消费者,当然推送消息的个数还是会受到Basic.Qos的限制。

如果只想从队列获取单挑消息而不是持续订阅,建议使用Basic.Get进行消费。但不能将Basic.Get放在一个循环里来代替Basic.Consume,这样做会严重影响RabbitMQ的性能。消费者理应使用Basic.Consume方法。

上一篇 下一篇

猜你喜欢

热点阅读