javaWeb学习

RabbitMQ高级特性

2019-12-03  本文已影响0人  小波同学

消息如何保障100%的投递成功?

什么是生产端的可靠性投递?

一线互联网大厂的解决方案:
1、消息落库,对消息状态进行打标

image.png
图解:

2、消息的延迟投递,做二次确认,回调检查
这种方式并不一定能保证100%成功,但是也能保证99.99%的消息成功。如果遇到特别极端的情况,那么就只能需要人工去补偿,或者定时任务去做。
第二种方式主要是为了减少对数据库的操作。

image.png
图解:

具体采用哪种方案,还需要根据业务与消息的并发量而定。

幂等性概念详解

幂等性是什么?

幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中,即f(f(x)) = f(x)。简单的来说就是一个操作多次执行产生的结果与一次执行产生的结果一致

消费端-幂等性保障

在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
在高并发的情况下,会有大量的消息到达MQ,消费端需要监听大量的消息。这样的情况下,难免会出现消息的重复投递,网络闪断等等。如果不去做幂等,则会出现消息的重复消费。
消费端实现幂等性,就意味着,我们的消息永远不会被消费多次,即使我们收到了多条一样的消息,也只会执行一次。
看下互联网主流的幂等性操作:

唯一ID+指纹码机制

Redis 原子特性实现
最简单使用Redis的自增。
使用Redis进行幂等,需要考虑的问题。

Confirm确认消息、Return返回消息

理解Confirm 消息确认机制:

如何实现Confirm确认消息?

Confirm确认模式的生产端:

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        //4、指定消息的投递模式:消息的确认模式
        channel.confirmSelect();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "confirm.save";

        //5、发送消息
        String msg = "hello rabbitmq send confirm message!";
        channel.basicPublish(exchangeName,routingKey,null,msg.getBytes("UTF-8"));

        //6、添加一个确认监听
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 返回成功
             * @param deliveryTag
             * @param multiple
             * @throws IOException
             */
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("----------ack----------");
            }

            /**
             * 返回失败
             * @param deliveryTag
             * @param multiple
             * @throws IOException
             */
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("----------no ack----------");
            }
        });
    }
}

Confirm确认模式的消费端:

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
//1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String queueName = "test_confirm_queue";
        String routingKey = "confirm.#";

        //4、声明交换机和队列,然后进行绑定设置,指定路由key
        channel.exchangeDeclare(exchangeName,"topic",true);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routingKey);

        //5、创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(envelope.getRoutingKey() + ":" + message);
            }
        };

        //6、设置channel
        channel.basicConsume(queueName,true,consumer);
    }
}

Return消息机制

Return消息机制基础API中有一个关键的配置项:

Return消息机制流程

image.png

Return消息机制生产端:

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        //4、指定消息的投递模式:消息的确认模式
        channel.confirmSelect();

        String exchangeName = "test_return_exchange";
        String routingKey = "return.save";
        String routingKeyError = "error.save";

        //5、发送消息
        String msg = "hello rabbitmq send confirm message!";
        channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes("UTF-8"));

        //第三个参数mandatory=true,意味着路由不到的话mq也不会删除消息,false则会自动删除
        channel.basicPublish(exchangeName,routingKeyError,true,null,msg.getBytes("UTF-8"));
        channel.basicPublish(exchangeName,routingKeyError,false,null,msg.getBytes("UTF-8"));

        //6、添加一个return监听
        channel.addReturnListener(new ReturnListener() {

            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                                     String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("----------handle return----------");
                System.out.println("replyCode:" + replyCode);
                System.out.println("replyText:" + replyText);
                System.out.println("exchange:" + exchange);
                System.out.println("routingKey:" + routingKey);
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body,"UTF-8"));
            }
        });
    }
}

Return消息机制消费端:

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String queueName = "test_return_queue";
        String routingKey = "return.#";

        //4、声明交换机和队列,然后进行绑定设置,指定路由key
        channel.exchangeDeclare(exchangeName,"topic",true);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routingKey);

        //5、创建消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(envelope.getRoutingKey() + ":" + message);
            }
        };

        //6、设置channel
        channel.basicConsume(queueName,true,consumer);
    }
}

关于Confirm确认消息、Return返回消息:rabbitmq生产者的消息确认

消息的限流(防止占用内存过多,节点宕机)

什么是消费端的限流?

为什么不在生产端进行限流呢?

因为在高并发的情况下,流量就是非常大,所以很难在生产端做限制。因此我们可以用MQ在消费端做限流。

限流机制生产者端:

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        //4、指定消息的投递模式:消息的确认模式
        channel.confirmSelect();

        String exchangeName = "test_qos_exchange";
        String routingKey = "qos.save";

        //5、发送消息
        String msg = "hello rabbitmq send confirm message!";
        channel.basicPublish(exchangeName,routingKey,true,null,msg.getBytes("UTF-8"));
    }
}

限流机制消费者端:

public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(envelope.getRoutingKey() + ":" + message);

        //手动签收
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}


public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";

        //4、声明交换机和队列,然后进行绑定设置,指定路由key
        channel.exchangeDeclare(exchangeName,"topic",true);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routingKey);


        //5、创建消费者
        DefaultConsumer consumer = new MyConsumer(channel);

        //6、限流方式 AutoAck设置为false
        channel.basicConsume(queueName,false,consumer);
        //
        channel.basicQos(0,1,false);
    }
}

消息的ACK与重回队列

消费端的手工ACK和NACK

消费端的重回队列

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        //4、指定消息的投递模式:消息的确认模式
        channel.confirmSelect();

        String exchangeName = "test_ack_exchange";
        String routingKey = "ack.save";

        Map<String,Object> headers = new HashMap<>();
        headers.put("num",0);
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .headers(headers)
                .build();
        for (int i = 0; i < 5; i++) {

            //5、发送消息
            String msg = "hello rabbitmq send ack message!"+i;
            channel.basicPublish(exchangeName,routingKey,true,properties,msg.getBytes("UTF-8"));
        }
    }
}

消费者端:

public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(envelope.getRoutingKey() + ":" + message);

        Map<String, Object> headers = properties.getHeaders();
        Integer num = (Integer) headers.get("num");

        if(num == 0){
            //第三个参数,true:重回队列,false:不重回队列
            //重回队列,会将消息重新添加到消息的尾部
            channel.basicNack(envelope.getDeliveryTag(),false,true);
            return;
        }

        //手动签收
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.save";

        //4、声明交换机和队列,然后进行绑定设置,指定路由key
        channel.exchangeDeclare(exchangeName,"topic",true);
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routingKey);


        //5、创建消费者
        DefaultConsumer consumer = new MyConsumer(channel);

        //6、手动签收 AutoAck设置为false
        channel.basicConsume(queueName,false,consumer);

    }
}

TTL消息

TTL

死信队列

死信队列:DLX,Dead-Letter-Exchange

RabbitMQ的死信队里与Exchange息息相关

消息变成死信有以下几种情况:

DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

可以监听这个队列中消息做相应的处理,这个特征可以弥补RabbitMQ3.0以前支持的immediate参数的功能。

死信队列设置:

Producer端:

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();


        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.save";

        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("20000")
                .build();

        //5、发送消息
        String msg = "hello rabbitmq send dlx message!";
        channel.basicPublish(exchangeName,routingKey,true,properties,msg.getBytes("UTF-8"));

    }
}

消费者端:

public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(envelope.getRoutingKey() + ":" + message);

        //手动签收
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");

        //2、通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        //3、通过Connection创建一个Channel
        Channel channel = connection.createChannel();

        //这是一个普通的交换机、队列、路由
        String exchangeName = "test_dlx_exchange";
        String queueName = "test_dlx_queue";
        String routingKey = "dlx.#";

        //4、声明交换机和队列,然后进行绑定设置,指定路由key
        channel.exchangeDeclare(exchangeName,"topic",true);
        Map<String,Object> agruments = new HashMap<>();
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        //这个agruments属性要声明到队列上
        channel.queueDeclare(queueName,true,false,false,agruments);
        channel.queueBind(queueName,exchangeName,routingKey);

        //死信队列的声明
        channel.exchangeDeclare("dlx.exchange","topic",true,false,null);
        channel.queueDeclare("dlx.queue",true,false,false,null);
        channel.queueBind("dlx.queue","dlx.exchange","#");

        //5、创建消费者
        DefaultConsumer consumer = new MyConsumer(channel);

        //6、手动签收 AutoAck设置为false
        channel.basicConsume("dlx.queue",false,consumer);
    }
}

在实际工作中,死信队列非常重要,用于消息没有消费者,处于死信状态。我们可以才用补偿机制。

参考:
https://www.cnblogs.com/coder-programming/p/11412048.html

https://www.cnblogs.com/coder-programming/p/11424152.html

https://my.oschina.net/genghz/blog/1840262

上一篇 下一篇

猜你喜欢

热点阅读