RabbitMQ方法实例

2020-11-11  本文已影响0人  程序员小杰

交换机

1、exchangeDeclare

声明交换机,exchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。

 public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
   boolean durable, boolean autoDelete, boolean internal, 
Map<String, Object> arguments);

这个方法的返回值是 Exchange.DeclareOk 用来标识成功声明了一个交换器。

各个参数详细说明如下所述。

2、exchangeDeclarePassive

用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常 404 channel exception ,同时 Channel 也会被关闭。

Exchange.DeclareOk exchangeDeclarePassive(String name); 

3、exchangeDelete

 Exchange.DeleteOk exchangeDelete(String exchange); 

 void exchangeDeleteNoWait(String exchange , boolean ifUnused); 

 Exchange.DeleteOk exchangeDelete(String exchange , 
boolean ifUnused);

其中 exchange 表示交换器的名称,ifUnused用来设置是否在交换器没有被使用的情况下删除。如果 ifUnused设置为 true ,则只有在此交换器没有被使用的情况下才会被删除

队列

1、queueDeclare

声明队列

   public Queue.DeclareOk queueDeclare();

   public Queue.DeclareOk queueDeclare(String queue, 
      boolean durable, boolean exclusive, boolean autoDelete,
       Map<String, Object> arguments);

不带任何参数的 queueDeclare 方法默认创建一个由 RabbitMQ 命名的(类似这种amq.gen-S9h... 的名称,这种队列也称之为匿名队列)、排他的、自动删除的、非持久化的队列。
参数说明:

2、queueDeclarePassive

用来检测相应的队列是否存在。 如果存在正常返回 ,如果不存在则抛出异常: 404 channel exception ,同时Channel 也会被关闭。方法定义如下

 public Queue.DeclareOk queueDeclarePassive(String queue);

3、queueDelete

(1) Queue.DeleteOk queueDelete(String queue);
(2) Queue.DeleteOk queueDelete(String queue , boolean ifUnused, 
boolean ifEmpty) ;
(3) void queueDeleteNoWait(String queue, boolean ifUnused,
 boolean ifEmpty) ;

其中 queue 表示队列的名称 ,如果 ifUnused设置为 true ,则只有在此队列在没有被使用的情况下才会被删除, ifEmpty设置为true,表示在队列为空(队列里面没有任何消息堆积)的情况下才能够删除。

4、queuePurge

清除给定队列的暂存内容

Queue.PurgeOk queuePurge(String queue);

绑定

1、queueBind

将队列和交换器绑定

    public Queue.BindOk queueBind(String queue, String exchange, String routingKey);

    public Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments);

    public void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments);

参数详解。

2、queueUnbind

将队列和交换器解除绑定

 Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;

 Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

具体的参数解释可以参考前面的内容,这里不再赘述

3、exchangeBind

将交换器与交换器绑定

    Exchange.BindOk exchangeBind(String destination, String source, String routingKey);

    Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments);

    void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments);
//声明source交换机
channel.exchangeDeclare ("source","direct", false , true , null) ; 
//声明destination交换机
channel.exchangeDeclare ("destination","fanout", false , true , null); 
//交换器与交换器绑定
channel.exchangeBind ("destination" , "source","exKey"); 
//声明对象
channel.queueDeclare ("queue", false, false , true , null); 
//将队列和交换器绑定
channel.queueBind ("queue","destination","");
//发布消息
channel.basicPublish ("source","exKey", null,"exToExDemo". getBytes () ) ;

生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换机destination,并把消息转发到destination中,进而存储在destination绑定的队列 queue中。

4、exchangeUnbind

将交换器与交换器解除绑定

    Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey);

    Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments);

    void exchangeUnbindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments);

具体的参数解释可以参考前面的内容,这里不再赘述

发送消息

basicPublish

    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);

    void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body);

    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body);

参数解释:

public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
        private String contentType; //消息类型如(text/plain)
        private String contentEncoding; //字符编码
        private Map<String,Object> headers; //请求头信息
        private Integer deliveryMode;  //消息的投递模式
        private Integer priority;  //优先级
        private String correlationId; //用来关联RPC的请求和响应。
        private String replyTo;  //一般用来命名一个回调queue。
        private String expiration;  //过期时间
        private String messageId;//消息表示符 用于标示消息
        private Date timestamp; //消息发送的时间戳
        private String type;  //消息类型
        private String userId;  //连接到mq的用户名
        private String appId;  //消息的应用程序的表示符  比如你的计算机名称
        private String clusterId;

参考大佬博文:https://www.pianshen.com/article/19451397954/

实例

投递模式(delivery mode)设置为2 ,即消息会被持久化(存入磁盘)在服务器中。同时这条消息的优先级(priority)设置为 1, contentType
为 text/plain,userId为当前登录用户名。

     channel.basicPublish("exchangeName" , "routingKey",
                new AMQP.BasicProperties.Builder()
                        .contentType ("text/plain")
                        .deliveryMode(2)
                        .priority(1)
                        .userId("test")
                        .build(),
                "ddf".getBytes());

发送一条带有 headers 的消息

Map<String, Object> headers = new HashMap<String, Object>() ;
        headers.put("token","782");
                channel.basicPublish("exchangeName", "routingKey",
        new AMQP.BasicProperties.Builder()
                .headers(headers)
                .build(),
        "ddf".getBytes()) ;

发送一条带有过期时间(expiration)的消息

 channel.basicPublish("exchangeName", "routingKey",
                new AMQP.BasicProperties.Builder()
                        . expiration ("6000")
                        .build(),
                "9999".getBytes());

消费消息

RabbitMQ 的消费模式分两种:推( Push )模式和拉( Pull )模式。推模式采用 Basic.Consume进行消费,而拉模式则是调用 Basic.Get 进行消费。

推模式

Channel 类中 basicConsume 方法太多,这里介绍几个常用的

String basicConsume(String queue , Consumer callback); 
String basicConsume(String queue , boolean autoAck, Consumer callback);

String basicConsume(String queue , boolean autoAck , Map<String , Object> 
arguments, Consumer callback); 

String basicConsume(String queue , boolean autoAck , String consumerTag, 
Consumer callback); 

String basicConsume(String queue , boolean autoAck , String consumerTag, 
boolean noLocal , boolean exclusive , Map<Str ng Object> arguments, 
Consumer callback);

对应的参数说明:

拉模式

通过 channel.basicGet方法可以单条地获取消息,其返回值是 GetResponse。Channel 类的 basicGet 方法没有其他重载方法,只有

 GetResponse basicGet(String queue, boolean autoAck);

其中 queue 代表队列的名称,如果设置 autoAck为false ,那么需要调用
channel.basicAck 来确认消息己被成功接收。

消费端的确认与拒绝

1、basicReject

    void basicReject(long deliveryTag, boolean requeue);

2、basicNack

void basicNack(long deliveryTag, boolean multiple, boolean requeue);

3、basicRecover

是否恢复消息到队列

     Basic.RecoverOk basicRecover() throws IOException;

    Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

这个 channel.basicRecover方法用来请求 RabbitMQ 重新发送还未被确认的消息,requeue参数设置为 true ,则未被确认的消息会被重新加入到队列中,并且尽可能的将消息投递给其他消费者消费,而不是自己再次消费如果,requeue参数设置为 false ,消息会被重新投递给自己。默认情况下,如果不设置requeue这个参数默认为 true。

4、basicAck

    void basicAck(long deliveryTag, boolean multiple) throws IOException;

以上知识均来自于朱忠华大佬《RabbitMQ实战指南》一书

image.png
上一篇 下一篇

猜你喜欢

热点阅读