RabbitMQ AMQP协议

2020-03-12  本文已影响0人  小蜗牛Aaron

VHost

创建方式:

Connection

Spring CachingConnectionFactory

默认情况下 单连接的连接工厂。 默认,首先会缓存一个Channel,然后再慢慢增 ,高并发使用场景、channelCacheSize 设置缓存的数 量 100
默认的缓存模式是缓存通道。
如果把缓存模式设为CacheMode.CONNECTION,则缓存连接以及连接上创建Channel. connectionCacheSize 属性设置缓存多少个连接

小注:CacheMode.CONNECTION 与 Rabbit Admin ( AmqpAdmin) 不兼容,不会自动创建exchange、 queues 等

exchange

Exchange.DeclareOk exchangeDeclare(String exchange, String type,   
boolean durable,    // 交换器是否持久化  避免重启后,要再次创建。 和消息的持久化没关系。 
boolean autoDelete,  // 当没有队列绑定到它时 是否自动删除 boolean internal,   // 是否是 MQ 内部使用的, 我们就不能在客户端中使用。 
Map<String, Object> arguments)

queue

-【注意】以amq.开头的队列名称 是保留给Broker内部使用的,如果用户创建这样的队列会异常。
-【注意】队列的持久性,跟消息的持久化也没关系。

Queue 的 TTL TIME TO LIVE

autoDelete 队列空闲一段时间之后再删除。

rabbitmqctl set_policy expiry ".*" '{"expires":1800000}' --apply-to queues

expiry 策略名称 自定义 ".*" 作用目标名称的正则表达式 '{"expires":1800000}' 策略定义 过期时间设置 单位毫秒 --apply-to queues 应用于哪一类实体
代码中声明队列是指定

Map<String, Object> args = new HashMap<String, Object>();            
args.put("x-expires", 1800000);            
channel.queueDeclare("myqueue", false, false, false, args);
channel.queueDeclare("queue1", false, false, false, args);

Publisher

路由不可达

可能情况:

默认情况: 消息丢弃

可以的处理办法:

退回
void basicPublish(String exchange, String routingKey, 
boolean mandatory, BasicProperties props, byte[] body) 

mandatory:true 强制退回, false 不需退回,直接丢弃。

备用交换
rabbitmqctl set_policy mike "^my-direct$" '{"alternate-exchange":"my-ae"}'
//声明参数 
Map<String, Object> args = new HashMap<String, Object>(); 
args.put("alternate-exchange", "my-ae"); //备用交换参数指定 
channel.exchangeDeclare("my-direct", "direct", false, false, args); 
channel.exchangeDeclare("my-ae", "fanout"); channel.queueDeclare("routed"); 
channel.queueBind("routed", "my-direct", "key1"); 
channel.queueDeclare("unrouted"); channel.queueBind("unrouted", "my-ae", "");

事务机制

@Configuration 
public class TxConfiguration {    
@Bean       // 配置事务管理器    
  public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {        
    return new RabbitTransactionManager(connectionFactory);    
  } 
}

在spring 该怎么玩事务就怎么玩. RabbitTransactionManager 只能做Rabbitmq的消息事务管理 只能是单连接的连接工厂
没有分布式事务管理器实现。
rabbitmq中事务机制来保证消息的可靠发布,性能是比较差

发布确认机制

性能是事务机制的250倍。
发布者发布消息,一般是走异步。
channel
有三种确认模式

broker给出确认会有三种结果

三种消息确认模式案例 参考

consumer

两种消费模式
Push模式

broker client 消费者
client 向 broker 注册对某个队列的消费者

String consumerTag = channel.basicConsume(queueName, true, callback, consumerTag -> {});

取消消费者注册

channel.basicCancel(consumerTag);
独占消费者

独占队列:被创建它的连接独占 这个连接上的channel 可以共享。连接关闭,独占队列没有了。
独占消费者:消费者独占一个对队列进行消息消费,适用场景: 消息一定要严格按序消费处理。

消费者优先级

x-priority

Map<String, Object> args = new HashMap<String, Object>(); args.put("x-priority", 10); // 整数,数值越大优先级越高。 默认 0 
channel.basicConsume("my-queue", false, args, consumer);

prefetch 20 Broker 轮询发送, 同优先级时是轮询 消息优先发给优先级高的消费者,直到prefetch满了 或 block.

消息确认

消息确认的传递模式

手动确认的3个操作

负面确认,可以指示Broker 移除消息以及是否重发。

pull 拉模式消费
// 批量Nack,并重发 
GetResponse gr1 = channel.basicGet("some.queue", false); 
GetResponse gr2 = channel.basicGet("some.queue", false); 
channel.basicNack(gr2.getEnvelope().getDeliveryTag(), true, true); //第二个参数 true表示批量

安全

guest用户管理

guest guest
改密码或删除guest用户。

rabbitmqctl delete_user guest 
rabbitmqctl change_password guest newpass
访问控制

Vhost 资源 (exchagne queue) 用户

  1. configure操作:指创建或销毁资源,或更改它们的行为。
  2. write 写操作:将消息注入到资源中。
  3. read操作:从资源中读取消息。
rabbitmqctl set_permissions [-p vhost] user conf write read
rabbitmqctl set_permissions -p my-vhost mike “^mike-.*” “.*” “.*”

channel.exchangeDeclare("aaaaa") 不可以

各种操作需要的权限
topic权限

设置用户对topic类型的交换器的消息权限

rabbitmqctl set_topic_permissions [-p vhost] user exchange write read
rabbitmqctl set_topic_permissions -p my-vhost mike amq.topic “^mike-.*” “^mike-.*”
上一篇 下一篇

猜你喜欢

热点阅读