【RabbitMQ的那点事】如何保证消息的正确发送

2022-05-07  本文已影响0人  伊丽莎白2015

文章内容:

文章内容

关于如何确保发送方数据安全的问题,官网也作了详细的解释:

Using standard AMQP 0-9-1, the only way to guarantee that a message isn't lost is by using transactions -- make the channel transactional then for each message or set of messages publish, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced.
AMQP协议提供的一个事务机制,虽然还能确保消息正确送达,但比较笨重(性能没有很好),在此基础上引入了发送方确认机制。

1. 那么如何实现发送方确认机制?

2. 发送方确认机制(publisher confirm)有三种方式:

a. 串行confirm模式(Publishing Messages Individually)
b. 批量confirm模式(Publishing Messages in Batches)
c. 异步confirm模式(Handling Publisher Confirms Asynchronously)

3. 如何用代码实现

官网文章参考:https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
官网基于原始的amqp-client.jar写的代码:https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/PublisherConfirms.java

3.1 首先是串行模式(Publishing Messages Individually)

- 配置:
publisher-confirm-type默认是NONE,也就是确认机制是disabled。这里我们要把它set为SIMPLE模式。
Publisher确认机制的方式是simple,意味着Producer发布一条消息后,需要同步等待Broker的basic.act,官网例子用的是amqp-client.jar,我这里用的是Spring Boot集成RabbitMQ后的方式。

spring:
  rabbitmq:
    port: 5672
    host: localhost
    virtual-host: spring-boot-test
    publisher-confirm-type: simple

- Producer端代码:
Producer发送一条消息,然后使用方法waitForConfirms(ms)等待,这个方法会阻塞等待到Broker的消息确认。如果在规定时间内没有确认,就会报错。

值得一提的是,如果Producer向一个不存在的exchange中发送消息,那么在执行rabbitOperations. waitForConfirms的时候不会抛AmqpTimeoutException错误,而是会抛出异常:com.rabbitmq.client.ShutdownSignalException: channel error;

@Slf4j
@SpringBootTest
public class ProducerConfirmServiceTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void singleConfirm() {
        try {
            rabbitTemplate.invoke(rabbitOperations -> {
                rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "hello, i am direct message!");
                // 等待Broker确认时间:1ms,超过1ms报错 
                return rabbitOperations.waitForConfirms(1);
            });
        } catch (AmqpTimeoutException e) {
            log.error("met timeout exception: ", e);
        }
    }
}

也可以用waitForConfirmsOrDie(ms)来确认:

            rabbitTemplate.invoke(rabbitOperations -> {
                rabbitOperations.convertAndSend("direct.exchange", "direct-routing-key", "hello, i am direct message!");
                rabbitOperations.waitForConfirmsOrDie(100000);
                return true;
            });

由于是发布者确认机制(发生在Publisher和Broker之间),消费端的代码没有改动,这里就不贴了,详细看 【RabbitMQ的那点事】与Spring boot集成:https://www.jianshu.com/p/4a21a7fce14c

上述方法测试结果会报错(1ms太短了,Broker来不及确认):
也可以通过Thread name=main看出是同步(阻塞等待)的,这里始终是主线程在执行。另外虽然Broker确认失败了,因为Broker其实是好的,只是我们设的等待时间太短了,所以消息依然是发送出去了。

2022-05-07 17:43:30.104 ERROR 63048 --- [ main] ProducerConfirmServiceTest : met timeout exception:
org.springframework.amqp.AmqpTimeoutException: java.util.concurrent.TimeoutException at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:73) ~[spring-rabbit-2.3.12.jar:2.3.12] at org.springframework.amqp.rabbit.core.RabbitTemplate.waitForConfirms(RabbitTemplate.java:2320) ~[spring-rabbit-2.3.12.jar:2.3.12]
...


3.2 其次是批量confirm确认(Publishing Messages in Batches)

在#3.1示例是单条publish后Producer就开始等待Broker的确认,当然我们也可以在发布一定数量的消息后再开始确认,比如100条。
这样做的好处是可以提高吞吐量。缺点是如果收不到Broker的确认,我们不知道这一批中哪一个消息开始出了问题,所以可能需要将这100条都重新发送,可能会造成重复发的情况。

@Slf4j
@SpringBootTest
public class ProducerConfirmServiceTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void batchConfirm() {
        try {
            rabbitTemplate.invoke(rabbitOperations -> {
                for (int i = 0; i < 10; i ++) {
                    rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "message - " + i);
                }

                return rabbitOperations.waitForConfirms(10000);
            });
        } catch (AmqpTimeoutException e) {
            log.error("met timeout exception: ", e);
        }
    }
}

3.3 最后是异步confirm确认(Handling Publisher Confirms Asynchronously)

同步确认的配置是publisher-confirm-type: simple
publisher-confirm-type: 还有另外一个配置项即:correlated,如果使用该配置项,说明发送方也需要消息确认,并且可以通过CorrelationData来回传额外的信息。这个分类方法与串行或批量无关,只是confirm能否回传数据的分类方式。

以下是示例:

spring:
  rabbitmq:
    port: 5672
    host: localhost
    virtual-host: spring-boot-test
    publisher-confirm-type: correlated

- 新建一个ConfirmCallBack类,需要实现RabbitTemplate.ConfirmCallback接口 ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause:

@Slf4j
public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    public void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {
        log.info("MsgSendConfirmCallBack , 回调id: {}", correlationData);

        if(ack) {
            log.info("消息发送成功");
        } else {
            log.info("消息发送失败: {}", cause);
        }
    }
}

- Producer类:
在发送消息前需要先set一个ConfirmCallback,发送消息的时候可以带上CorrelationData,在callback中可以接收该data:

@Slf4j
@SpringBootTest
public class ProducerConfirmServiceTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void confirmAsync() {
        try {
            rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack());

            CorrelationData correlationData = new CorrelationData();
            log.info("开始发送消息");
            correlationData.setId("100");
            rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "hello, i am direct message!", correlationData);

        } catch (AmqpTimeoutException e) {
            log.error("met timeout exception: ", e);
        }
    }
}

测试结果:可以看出回调方法用的是自己的线程,即异步。并且能收到发送时带的CorrelationData类:

2022-05-07 18:41:32.112 INFO 75420 --- [ main] ProducerConfirmServiceTest : 开始发送消息
2022-05-07 18:41:32.133 INFO 75420 --- [nectionFactory1] MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=100]
2022-05-07 18:41:32.135 INFO 75420 --- [nectionFactory1] MsgSendConfirmCallBack : 消息发送成功

也可以发送到错误的exchange上来测试发送callback:

rabbitTemplate.convertAndSend("wrong.exchange", "direct-routing-key", "hello, i am direct message!", correlationData);

测试结果:callback会检测到错误,也就是说使用ConfirmCallBack无论消息是否正确送到Broker,都会进入该回调函数类中。

2022-05-07 19:03:37.792 INFO 80121 --- [ main] ProducerConfirmServiceTest : 开始发送消息
2022-05-07 19:03:37.802 ERROR 80121 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'wrong.exchange' in vhost 'spring-boot-test', class-id=60, method-id=40)
2022-05-07 19:03:37.804 INFO 80121 --- [nectionFactory2] MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=100]
2022-05-07 19:03:37.806 INFO 80121 --- [nectionFactory2] MsgSendConfirmCallBack : 消息发送失败: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'wrong.exchange' in vhost 'spring-boot-test', class-id=60, method-id=40)


4. 总结

发送方确认机制是保证消息可靠环节的第1步。三种方式总结如下:
a. 发送消息串行(逐条)确认——同步等待,简单,但会限制吞吐量。
b. 批量发送消息后再确认——同步等待,简单,能提高吞吐量,但极端情况下会造成消息的重复发送(无法精确定位到单条错误消息)。
c. 发送消息后异步等待确认,效率高,但需要正确的实现接口方法。

以下是官方的例子测评结果(官网代码在#3一开始有贴),发送消息的总条数都是50000条,Brokder和producer都在同一台机器上:

测试用例 花费时间
串行确认(同步逐条确认) 5,549 ms
批量(按100条一批次,同步确认) 2,331 ms
异步确认 4,054 ms

生产环境往往Broker是单独的机器,所以官网又做了以下的测试,同样是发送50000条消息,但这次是远程发送:

测试用例 花费时间
串行确认(同步逐条确认) 231,541 ms
批量(按100条一批次,同步确认) 7,232 ms
异步确认 6,332 ms

可以看到逐条发送后确认的效率是惊人的低。批量确认和异步确认的效率差不太多。批量确认的代码容易实现,而异步确认的实现会比较复杂一些。



结束了吗?还没有!!!

上述串行、批量确认以及异步确认,都是为了解决:让Producer知道信息有没有成功的发送到Broker的Exchange交换机上,但如果消息从Exchange 到 Queue投递失败(或者Exchange没有匹配的Queue的话),那么消息也会丢失,这时候要怎么办?

  1. 当发布者发布消息到Exchange上,但Exchange没有绑定的Queue时,默认情况下发布的消息会丢掉。当然这时候我们也可以启用Alternate Exchange,将没有目的地的消息统一转到这个Alternative Exchange上来。
  2. 或者在发送消息的时候,将参数mandatory置为true,那么message就会退回到Producer方,Producer方需要实现ReturnCallback接口(https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ReturnCallback.html),也能将退回的消息取到。

针对上述两种方式,具体来演示:

5. Alternate Exchange

关于Alternate Exchange, 参见官网:https://www.rabbitmq.com/ae.html

以下是具体思路:

  1. 首先创建一个Exchange(Fanout类型),叫backup.exchange。
  2. 再创建一个Queue,叫noBinding.queue,并绑定到backup.exchange上(因为该exchange是fanout type,所以routingKey为空)。
  3. 在创建正常要使用的Exchange时(比如叫direct.exchange),可以将backup.exchange作为参数名为alternate-exchange的值,传入direct.exchange中。

以下是代码示例:

先是Alternate Exchange的创建:
@Configuration
public class AlternateExchangeConfig {
    @Bean
    public Queue noRoutedQueue() {
        return new Queue("noBinding.queue", true);
    }

    @Bean
    public FanoutExchange backupExchange() {
        return new FanoutExchange("backup.exchange");
    }

    @Bean
    public Binding noBinding(Queue noRoutedQueue, FanoutExchange backupExchange) {
        return BindingBuilder.bind(noRoutedQueue).to(backupExchange);
    }

    @RabbitListener(queues = "noBinding.queue")
    public void listen(String in) {
        System.out.println("[noBinding.queue]: " + in);
    }
}
再是正常业务处理的Exchange

可以看到这里有新加arguments,key是alternate-exchange, value是上述创建的backup.exchange:

@Configuration
public class DirectExchangeConfig {
    @Bean
    public Queue directqueue() {
        return new Queue("direct.queue", true);
    }

    @Bean
    public DirectExchange directExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange", "backup.exchange");

        DirectExchange directExchange = new DirectExchange("direct.exchange", true, false, arguments);
        return directExchange;
    }

    @Bean
    public Binding directBinding(Queue directqueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directqueue).to(directExchange).with("direct-routing-key");
    }

    @RabbitListener(queues = "direct.queue")
    public void listen(String in) {
        System.out.println("Direct Message Listener: " + in);
    }
}
测试

将消息发送到direct.exchange,但是routingKey是错误的,也就是这个消息没有目的地:

@Slf4j
@SpringBootTest
public class ProducerServiceTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMessageToDirectExchangeWrongly() {
        rabbitTemplate.convertAndSend("direct.exchange", "wrong.routing-key", "hello, i am direct message!");
    }
}

测试印机结果:[noBinding.queue]: hello, i am direct message!
也就是由于routingKey是错的,消息并没有从direct.exchange正确的发送到direct.queue上,而是转发到了backup.exchange上,通过广播模式被noBinding.queue监听到。

当然我们也可以在Console UI上操作: add arg: alternate-exchange

6. 实现ReturnCallback接口来接收退回的消息

在rabbitmq原生的API中,需要在发送的时候将参数mandatory置为true,然后通过实现ReturnCallback接口来接收退回的消息。

如果是和Spring Boot结合,以下是示例:

配置:

首先需要先设置publisher-returns = true

spring:
  rabbitmq:
    port: 5672
    host: localhost
    virtual-host: spring-boot-test
    publisher-returns: true
Producer类

尝试往错误的routingKey中发消息,即topic.exchange通过a.wrong,找不到正确的Queue,由于publisher-returns为true,所以消息就被ReturnCallback捕捉到了。
在高版本的RabbitTemplate中的ReturnCallback是@Deprecated,理由是提倡我们使用lamda表达式去实现,取而代之的是FunctionalInterface ReturnsCallback,这个接口其实就是ReturnCallback的子接口。
所以我们不需要单独创建类,而是在rabbitTemplate setReturnsCallback的时候直接使用lamda表达式,一般里面的实现可以是发送邮件等。

为什么在Spring Boot的Producer发送消息的时候不需要再set mandatory=true是因为Spring Boot在send的时候,帮我们判断了,如果我们设置了ReturnCallback的实现,mandatory自动就为true了。 RabbitTemplate send message
@Slf4j
@SpringBootTest
public class ReturnCallbackServiceTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void returnCallback() {
        rabbitTemplate.setReturnsCallback((message) -> {
            log.info("getMessage: {}", message.getMessage());
            log.info("getRoutingKey: {}", message.getRoutingKey());
            log.info("getExchange: {}", message.getExchange());
            log.info("getReplyCode: {}", message.getReplyCode());
            log.info("getReplyText: {}", message.getReplyText());
        });

        rabbitTemplate.convertAndSend("topic.exchange", "a.wrong", "important message!");
        log.info("Finished for sending message...");
    }
}

测试结果:

2022-05-10 12:46:50.280 INFO 58740 --- [ main] ReturnCallbackServiceTest : Finished for sending message...
2022-05-10 12:46:50.282 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getMessage: (Body:'important message!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2022-05-10 12:46:50.284 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getRoutingKey: a.wrong
2022-05-10 12:46:50.285 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getExchange: topic.exchange
2022-05-10 12:46:50.285 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getReplyCode: 312
2022-05-10 12:46:50.285 INFO 58740 --- [nectionFactory1] ReturnCallbackServiceTest : getReplyText: NO_ROUTE

  1. 关于Alternate Exchange和ReturnCallback的优先级:如果同时设置了,那么Alternate Exchange的优先级更高,也就是退回的消息会首先转到设置的Alternate Exchange中,从而不会调用ReturnCallback了。
  2. 如果是延迟队列(delayed exchange),那么ReturnCallback会一直报ReplyCode=312的错,也就是延迟队列不适合使用ReturnCallback功能。
上一篇 下一篇

猜你喜欢

热点阅读