JAVA后台开发_从入门到精通

6 保证消息不丢失

2019-12-26  本文已影响0人  笑Skr人啊

1 消息持久化

1.1 exchange持久化

/*
 * 声明消息队列,且为可持久化的
 *
 * EXCHANGE_NAME: 交换机名称(name)
 * direct: 交换机类型(type)
 * true: 是否持久化(durable)
 * false: 是否自动删除(autoDelete)
 * null: 其他参数(arguments)
 */
 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);

1.2 queue持久化

/*
 * 声明消息队列,且为可持久化的
 *
 * QUEUE_NAME: 队列名称(name)
 * true: 是否持久化(durable)
 * false: 是否为排他性队列(exclusive)
    1: 只对首次声明它的连接(Connection)可见
        1.1 首次声明: 因为另外一个连接无法声明一个同样的排他性队列
        1.2 只区别连接(Connection)而不是通道(Channel),从同一个连接创建的不同的通道可以同时访问某一个排他性的队列

    2: 会在其连接断开的时候自动删除
        2.1 无论队列是否被声明成持久性的(Durable =true),只要调用连接的Close方法或者客户端程序退出,RabbitMQ都会删除这个队列
        2.2 注意这里是连接断开的时候,而不是通道断开。这个其实前一点保持一致,只区别连接而非通道。

 * false: 是否自动删除(autoDelete)
 * null: 其他参数(arguments)
 */
channel.queueDeclare(QUEUE_NAME, true, false,  false,null);

1.3 消息持久化

// 消息持久化
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 1:不持久化 2:持久化
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
/*
 * EXCHANGE_NAME: 交换机名称
 * "" : 路由key
 * properties: 基本属性
 * message: 消息体
 */
channel.basicPublish(EXCHANGE_NAME, "", properties, message.getBytes());

2 消费者ACK确认机制

2.1 自动确认

// 自动确认
channel.basicConsume(QUEUE_NAME, true, consumer);

只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。

2.2 手动确认

// 手动确认
channel.basicConsume(QUEUE_NAME, false, consumer);

// 向服务端确认消息已被消费
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

3 生产者AMQP事务机制(生产环境禁止使用)


// 开启事务
try {
    String message = "I am simple_queue!";
    // 开启事务
    channel.txSelect();
    // 往队列中发出一条消息,使用rabbitmq默认交换机
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    // 提交事务
    channel.txCommit();
} catch (Exception e) {
    e.printStackTrace();
    // 事务回滚
    channel.txRollback();
}

4 生产者确认

try {
    // 生产者通过调用channel.confirmSelect方法(即Confirm.Select命令)将信道设置为confirm模式
    channel.confirmSelect();
    // 消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID)
    channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes());

    // 
    if (!channel.waitForConfirms()){
        System.out.println("message failed!");
        // do something
    }
}catch (Exception e){
    e.printStackTrace();
}
上一篇 下一篇

猜你喜欢

热点阅读