RabbitMQ-消息确认机制(事物+confirm)

2020-07-20  本文已影响0人  jiahzhon

在rabbitmq中,我们可以通过持久化数据,解决rabbitmq的服务器异常 的数据丢失问题。问题:生产者将消息发送出去后,是否到达rabbitmq服务器?默认的情况下是不知道的。

  1. AMQP 实现了事物机制
  2. confirm 模式
image.png

这种模式比较耗时,降低了rabbitmq的吞吐量

public class SendMany {
    private static final String QUEUE_NAME = "confirm_test_1";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connections = ConnectionUtils.getConnections();
        Channel channel = connections.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 生产者调用confirmSelect 将channel设置为confirm模式
        channel.confirmSelect();
        String msg = "hello  confirm  msg";
        System.out.println("send--confirm---" + msg);
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        if (!channel.waitForConfirms()) {
            System.out.println("massage send failed");
        } else {
            System.out.println("massage send ok");
        }
        connections.close();
    }
}
public class Send3 {
    //下面代码并没有处理handleAck和handleNack
    private static final String QUEUE_NAME = "confirm_test_1";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connections = ConnectionUtils.getConnections();
        Channel channel = connections.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 生产者调用confirmSelect 将channel设置为confirm模式
        channel.confirmSelect();
        // 存放未确认的消息标识
        final SortedSet<Long> confirmSet = Collections.synchronizedNavigableSet(new TreeSet<Long>());
        // 添加通道监听
        channel.addConfirmListener(new ConfirmListener() {
            public void handleAck(long deliverTag, boolean mutiple) throws IOException {
                if (mutiple) {
                    System.out.println("---handleAck-------mutiple----");
                    confirmSet.headSet(deliverTag + 1).clear();
                } else {
                    System.out.println("---handleAck-------mutiple---false");
                    confirmSet.remove(deliverTag);
                }
            }

            public void handleNack(long deliverTag, boolean mutiple) throws IOException {
                if (mutiple) {
                    System.out.println("---handleNack-------mutiple----");
                    confirmSet.headSet(deliverTag + 1).clear();
                } else {
                    System.out.println("---handleNack-------mutiple---false");
                    confirmSet.remove(deliverTag);
                }
            }

        });

        String msg = "hello confirm msg";
        while (true) {
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            confirmSet.add(seqNo);
        }
    }
}

上一篇下一篇

猜你喜欢

热点阅读