rabbitMQ可靠性投递

2019-02-19  本文已影响12人  靈08_1024

可靠性投递

关于消息的可靠性投递,从以下几个方面来着手:

本文关于消息投递的两种方式:消息落库打标二次消息机制

消息落库打标

1)发送方将消息存到RDB中(如MySQL);该步骤可以通过事务来确保 业务数据消息数据 落库成功;
2)发送方发送消息给MQ broker;
3)MQ broker返回给发送方确认ACK(rabbitmq自带的confirm机制);
4)修改消息数据库;
5)分布式定时任务会检查消息数据库,取出其中的异常数据(如超时状态为修改),进入重试阶段;
6)重试阶段,针对于每个任务都有一个业务数据来关联重试次数。超过次数,报告失败;

其中,在第1)步,如果持久化失败,会快速失败,便于进行下一次重试。
在第3)步,如果出现网络抖动或者异常,即MQ broker返回给发送方失败。会导致该消息数据长时间在数据库中没有被更新,即状态还是没有被MQ broker确认的状态。所以需要有另外的定时任务去检查消息入库时长,超过一定时间会踢出任务,进入步骤5)。

缺点:因为在第一步需要对数据库进行2次持久化操作,还需要额外的事务来保证。所以对于并发高的环境,其实还是需要深思熟虑的。


二次消息机制

其中涉及服务:上游发送方,MQ broker,下游接收方,回调方。


image.png

1)业务数据落库,落库完毕再发送消息,此处发送2条消息;
2)MQ broker的消息到下游服务;
3)confirm消息:下游服务消费后,重新生成一条消息,投递给MQ broker;
4)此时callback service监听下游服务生成的消息,对收到的消息进行持久化,保存到DB中;
5)callback service监听发送方的第二条消息,并去数据库中进行查询,有即处理好了,没有即处理异常;
6)第5)步异常后,进入该步骤。带着相关业务信息通知发送方重新发送该消息。

在第1)步中,第一条消息是发送给下游业务方的,为立即发送;第二条消息延迟5分钟后进行发送,是发送给callback service来作为检查条件,查看下游服务是否处理完毕该消息。

相比于第一种方案,在主业务流程中减少了DB操作,将对消息的保存移动到了callback service中。

幂等性

幂等性,就是一条数据无论经过多少次该方法的操作,对于我们拿到的结果都是一样的。

常见的幂等性解决方案:


return消息机制

在某些情况下,我们发送的消息,当前的exchange或routing不存在或者不可达,这个时候就需要return listener。
通过设置mandatory属性来完成。该属性为true,当消息不可达时,会触发return机制;为false时,该消息删除,不报告错误。

//第三个参数就是mandatory:true开启,false关闭
channel.basicPublish(exchangeName, routingKey, true,null, msg.getBytes());

消息限流机制

在消费者里面代码如下设置:

            //prefetchSize  消息大小限制,一般设置为0,表示不限制
            //prefetchCount  一次最多处理多少条消息,一般设置为1。在有这么多消息返回ack后,该consumer阻塞
            //global  true在channel上限制,false在consumer上限制。一般设置为false
            channel.basicQos(0,1,false);
            //autoAck是否自动签收
            channel.basicConsume(queueName, false, new MyConsumer(channel));

在自定义的类MyConsumer中,添加如下配置:

public class MyConsumer extends DefaultConsumer {

    private Channel channel ;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //如果上面的basicQos中第二个数值设置为1,则此处第二个值设置为false,大于1则设置为true。表示是否多个ack返回。
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}
上一篇 下一篇

猜你喜欢

热点阅读