RabbitQMsaas

如何保证RabbitMQ消息100%成功

2019-06-01  本文已影响0人  ChinaXieShuai

如何保证RabbitMQ消息100%不丢失

很多后台业务团队做消息通知时,选择rabbitmq,Springboot框架本身也支持rabbitmq。在使用rabbitmq的时候,使用者希望消息不被丢失,rabbitmq就提供了成熟的解决方案保证了消息的不丢失。但是很多开发者在使用过程中,却时长出现消息丢失的情况。

本文就针对rabbitmq的消息丢失问题,设计实验,并提供成熟的解决方案。

RabbitMQ原理:如何保证消息不丢失

RabbitMQ提供的confirm模式来保证发送者能够确认消息发送成功或失败。confirm有三种模式:同步confirm、批量confirm、异步confirm。关于这三种模式的原理,可以通过RabbitMQ 问答式总结 链接了解。

而springboot本身默认的是异步confirm。

提供了RabbitTemplate.ConfirmCallback类的
public void confirm(CorrelationData correlationData, boolean ack, String s)方法

RabbitTemplate.ReturnCallback类的
public void returnedMessage(Message message, int code, String s, String exchange, String routingKey)方法来对发送者的消息进行反馈。

public void confirm(CorrelationData correlationData, boolean ack, String s)方法的入参boolean ack的值,true时为发送成功。false时发送失败。

public void returnedMessage(Message message, int code, String s, String exchange, String routingKey)方法被调用的时候,说明消息别rabbitmq broken拒接接收了。再有几种情况下回拒绝接收:没找到对应的exchange,对应的routingKey没有找到,没有找到对应的queue等情况。

设计:如何根据RabbitMQ的原理,设计一套方案保证mq消息不丢失

思路:

1.发送mq消息后,在数据库中插入发送的数据。
2.当RabbitMQ进行发送成功确认,调用confirm()方法,ack为true时mq消息发送成功,更新数据库的数据,状态置为成功; ack为false时,mq消息发送失败,更新数据库的数据,状态置为失败
3.启动定时任务扫描,4秒钟之前且发送失败的数据,重新发送

设计思路

建表语句:

CREATE TABLE `mq_publisher` (
  `id` bigint(10) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `rabbit_template_name` varchar(128) DEFAULT NULL COMMENT 'rabbit_template名',
  `correlation_id` varchar(128) DEFAULT NULL COMMENT 'correlationId',
  `exchange_name` varchar(128) DEFAULT NULL COMMENT 'exchange',
  `routing_key` varchar(128) DEFAULT NULL COMMENT 'routing_key',
  `body` text COMMENT 'mq消息内容',
  `status` int(1) DEFAULT NULL COMMENT '发送状态 1:初始化 2:成功',
  `message` text COMMENT 'message',
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_rabbit_template` (`rabbit_template_name`),
  KEY `idx_correlation_id` (`correlation_id`),
  KEY `idx_exchange` (`exchange_name`),
  KEY `idx_routing_key` (`routing_key`),
  KEY `idx_status` (`status`),
  KEY `idx_create_time` (`create_time`),
  KEY `idx_update_time` (`update_time`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='mq生产者发送消息表';

实现猜想:

配置:

v1.spring.rabbitmq.host=XXXX
v1.spring.rabbitmq.port=XXXX
v1.spring.rabbitmq.username=XXXXX
v1.spring.rabbitmq.password=XXXXX
v1.spring.rabbitmq.virtual-host=/
#consume 手动 ack
v1.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.当mandatory标志位设置为true时,
#   如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
#   那么broker会调用basic.return方法将消息返还给生产者;
#2.当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
#   mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
#   否则就将消息return给发送者;
v1.spring.rabbitmq.template.mandatory=true
#publisher confirms 发送确认
v1.spring.rabbitmq.publisher-confirms=true
#returns callback :
#   1.未送达exchange
#   2.送达exchange却未送道queue的消息 回调returnCallback.(注意)出现2情况时,publisher-confirms 回调的是true
v1.spring.rabbitmq.publisher-returns=true
v1.spring.rabbitmq.listener.simple.prefetch=5

创建自定义RabbitMQ的连接:(注:当需要创建多连接时,使用多个不同的MyStoredRabbitOperationsByMySql,保证多个连接不会互相影响)

package com.springboot.test;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

/**
 * Created by shuai on 2019/4/23.
 */
@Configuration
public class MultipleRabbitMQConfig {

    @Autowired
    private MyStoredRabbitOperationsByMySql myStoredRabbitOperationsByMySql;

    // mq主连接
    @Bean(name = "publicConnectionFactory")
    @Primary
    public CachingConnectionFactory publicConnectionFactory(
            @Value("${v1.spring.rabbitmq.host}") String host,
            @Value("${v1.spring.rabbitmq.port}") int port,
            @Value("${v1.spring.rabbitmq.username}") String username,
            @Value("${v1.spring.rabbitmq.password}") String password,
            @Value("${v1.spring.rabbitmq.virtual-host}") String virtualHost,
            @Value("${v1.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
            @Value("${v1.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setPublisherReturns(publisherReturns);
        return connectionFactory;
    }

    @Bean(name = "publicRabbitTemplate")
    @Primary
    public MyStoredRabbitTemplate publicRabbitTemplate(
            @Qualifier("publicConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v1.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
        MyStoredRabbitTemplate publicRabbitTemplate = new MyStoredRabbitTemplate(connectionFactory, myStoredRabbitOperationsByMySql, "publicRabbitTemplate");
        publicRabbitTemplate.setMandatory(mandatory);
        publicRabbitTemplate.setConfirmCallback(publicRabbitTemplate);
        publicRabbitTemplate.setReturnCallback(publicRabbitTemplate);
        return publicRabbitTemplate;
    }

    @Bean(name = "publicContainerFactory")
    public SimpleRabbitListenerContainerFactory insMessageListenerContainer(
            SimpleRabbitListenerContainerFactoryConfigurer configurer,
            @Qualifier("publicConnectionFactory") ConnectionFactory connectionFactory,
            @Value("${v1.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
            @Value("${v1.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
        factory.setPrefetchCount(prefetch);
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean(name = "publicRabbitAdmin")
    public RabbitAdmin publicRabbitAdmin(
            @Qualifier("publicConnectionFactory") ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

根据不同的连接创建不同的队列交换机绑定关系等

package com.springboot.test;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by shuai on 2019/5/16.
 */
@Configuration
public class RabbitMQCreateConfig {

    @Resource(name = "publicRabbitAdmin")
    private RabbitAdmin publicRabbitAdmin;

//    @Resource(name = "someOtherRabbitAdmin")
//    private RabbitAdmin iqianzhanRabbitAdmin;

    @PostConstruct
    public void RabbitInit() {


/*
        publicRabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
        publicRabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
        publicRabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));

        publicRabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
        publicRabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
        publicRabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));

        publicRabbitAdmin.declareBinding(new Binding("test.direct.queue",
                Binding.DestinationType.QUEUE,
                "test.direct", "direct", new HashMap<>()));

        publicRabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("test.topic.queue", false))        //直接创建队列
                        .to(new TopicExchange("test.topic", false, false))    //直接创建交换机 建立关联关系
                        .with("user.#"));    //指定路由Key

        publicRabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("test.fanout.queue", false))
                        .to(new FanoutExchange("test.fanout", false, false)));

*/
    }
}

重写RabbitTemplate类为MyStoredRabbitTemplate,提供两个方法对外使用:
public boolean MQSend(String exchange, String routingKey, Object object)
public boolean messageSendMQ(String exchange, String routingKey, Message message)
(注:MQSend()方法入参String exchange为空时,routingKey为直连的队列名,messageSendMQ()是为了兼容head模式提供的方法,可以修改Message message入参的head头来实现head模式)

package com.springboot.test;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
 * Created by shuai on 2019/5/12.
 */
public class MyStoredRabbitTemplate extends RabbitTemplate implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    private final static Logger LOGGER = LoggerFactory.getLogger(MyStoredRabbitTemplate.class);
    /**
     * 重试次数key
     */
    private final static String COUNT_TIME = "count_time";

    private final static Integer ONE = 1;

    private final static Integer FOUR = 4;

    /**
     * 默认的name
     */
    private final static String RABBIT_TEMPLATE_NAME = "MyStoredRabbitTemplate";

    /**
     * MyRabbitTemplate的name
     */
    private String name = RABBIT_TEMPLATE_NAME;

    /**
     * MyStoredRabbitTemplate的操作,插入,更新,数据
     */
    private MyStoredRabbitOperations myStoredRabbitOperations;

    /**
     * @param connectionFactory        rabbitmq连接工厂
     * @param myStoredRabbitOperations 重试策略
     * @param RabbitTemplateName       自定义名字
     */
    public MyStoredRabbitTemplate(ConnectionFactory connectionFactory, MyStoredRabbitOperations myStoredRabbitOperations, String RabbitTemplateName) {
        super(connectionFactory);
        this.name = RabbitTemplateName;
        this.myStoredRabbitOperations = myStoredRabbitOperations;
    }

    /**
     * RabbitMQ的confirm回调
     *
     * @param correlationData correlationData
     * @param ack             ack
     * @param s               s
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        Object object = myStoredRabbitOperations.getMessageByCorrelationId(name, correlationData.getId());
        if (!ack) {
            LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", name, JSON.toJSONString(object));
        } else {
            LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", name, JSON.toJSONString(object));
            myStoredRabbitOperations.updateSendMessageSuccess(name, correlationData.getId());
        }
    }

    /**
     * RabbitMQ的return回调
     *
     * @param message    message
     * @param code       code
     * @param s          s
     * @param exchange   exchange
     * @param routingKey routingKey
     */
    @Override
    public void returnedMessage(Message message, int code, String s, String exchange, String routingKey) {
        LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]",
                name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey));
    }

    /**
     * 消费端消费失败出现异常等情况时,
     * 转发消息至Exchange为retryExchangeName中,根据RoutingKey为retryRoutingKey再次转发至原queue进行消费,
     * 如此往复3次,超过3次之后,还异常,则放入failQueue中
     *
     * @param message           message
     * @param retryExchangeName retryExchangeName
     * @param retryRoutingKey   retryRoutingKey
     * @param failExchangeName  failExchangeName
     * @param failRoutingKey    failRoutingKey
     */
    public boolean retryRabbitMQ(Message message, String retryExchangeName, String retryRoutingKey, String failExchangeName, String failRoutingKey) {
        try {
            Map<String, Object> headersMap = message.getMessageProperties().getHeaders();
            if (CollectionUtils.isEmpty(headersMap)) {
                message.getMessageProperties().setHeader(COUNT_TIME, ONE);
                // retry
                messageSendMQ(retryExchangeName, retryRoutingKey, message);
                LOGGER.info("{} rabbitmq 消费失败,重新扔回队列,exchange:[{}],routingKey:[{}],message:[{}]", name, retryExchangeName, retryRoutingKey, JSON.toJSONString(message));
            } else {
                if (!headersMap.containsKey(COUNT_TIME) || headersMap.get(COUNT_TIME) == null) {
                    headersMap.put(COUNT_TIME, ONE);
                    // retry
                    messageSendMQ(retryExchangeName, retryRoutingKey, message);
                    LOGGER.info("{} rabbitmq 消费失败,重新扔回队列,exchange:[{}],routingKey:[{}],message:[{}]", name, retryExchangeName, retryRoutingKey, JSON.toJSONString(message));
                } else {
                    Integer countTime = (Integer) headersMap.get(COUNT_TIME);
                    if (countTime < FOUR) {
                        message.getMessageProperties().setHeader(COUNT_TIME, ++countTime);
                        // retry
                        messageSendMQ(retryExchangeName, retryRoutingKey, message);
                        LOGGER.info("{} rabbitmq 消费失败,重新扔回队列,exchange:[{}],routingKey:[{}],message:[{}]", name, retryExchangeName, retryRoutingKey, JSON.toJSONString(message));
                    } else {
                        // fail
                        messageSendMQ(failExchangeName, failRoutingKey, message);
                        LOGGER.info("{} rabbitmq 消费失败4次,扔到消费失败队列,exchange:[{}],routingKey:[{}],message:[{}]", name, failExchangeName, failRoutingKey, JSON.toJSONString(message));
                    }
                }
            }
        } catch (Throwable e) {
            LOGGER.error("{} rabbitmq 消费失败,重新扔回队列出现异常,exchange:[{}],routingKey:[{}],message:[{}],异常为", name, retryExchangeName, retryRoutingKey, JSON.toJSONString(message), e);
            return false;
        }
        return true;
    }


    /**
     * 发送mq消息
     *
     * @param exchange   exchange
     * @param routingKey routingKey
     * @param object     object
     * @return boolean
     */
    public boolean MQSend(String exchange, String routingKey, Object object) {
        try {
            if (object == null) {
                return false;
            }
            String data = JSON.toJSONString(object);
            String generateId = UUID.randomUUID().toString();
            // 本地缓存
            MessageProperties messageProperties = new MessageProperties();
            // 设置消息是否持久化。Persistent表示持久化,Non-persistent表示不持久化
            messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            messageProperties.setCorrelationId(generateId);
            Message message = new Message(data.getBytes(), messageProperties);
            // 存储
            myStoredRabbitOperations.saveMessage(name, generateId, exchange, routingKey, message);

            this.send(exchange, routingKey, message, new CorrelationData(generateId));
        } catch (Throwable e) {
            LOGGER.error("{} sendRabbitMQ 发送异常,exchange:[{}],routingKey:[{}],object:[{}],correlationData:[{}],异常为:", name, exchange, routingKey, JSON.toJSONString(object), e);
            return false;
        }
        LOGGER.info("{} sendRabbitMQ 发送成功,exchange:[{}],routingKey:[{}],object:[{}]", name, exchange, routingKey, JSON.toJSONString(object));
        return true;
    }


    /**
     * 根据message发送消息
     *
     * @param exchange   exchange
     * @param routingKey routingKey
     * @param message    message
     * @return boolean
     */
    public boolean messageSendMQ(String exchange, String routingKey, Message message) {
        try {
            String id = UUID.randomUUID().toString();
            message.getMessageProperties().setCorrelationId(id);
            // 存储
            myStoredRabbitOperations.saveMessage(name, id, exchange, routingKey, message);

            this.send(exchange, routingKey, message, new CorrelationData(id));
        } catch (Throwable e) {
            LOGGER.error("{} messageSendRabbitMQ 发送异常,exchange:[{}],routingKey:[{}],object:[{}],correlationData:[{}],异常为:", name, exchange, routingKey, JSON.toJSONString(message), e);
            return false;
        }
        LOGGER.info("{} messageSendRabbitMQ 发送成功,exchange:[{}],routingKey:[{}],object:[{}]", name, exchange, routingKey, JSON.toJSONString(message));
        return true;
    }


    /**
     * 缓存阈值,报警
     */
    private Integer CACHE_THRESHOLD = 100;

    /**
     * 启动时是否休眠
     */
    private boolean isSleep = true;

    /**
     * 休眠多长时间
     */
    private Integer DEFAULT_RECONNECTION_DELAY = 1000;

    /**
     * 时间间隔4秒
     */
    private int VALID_TIME = 4000;
    /**
     * 执行间隔
     */
    private int RETRY_TIME_INTERVAL = 10;
    /**
     * 开关
     */
    private boolean stop = false;

    /**
     * rabbitmq发送补偿
     */
    @PostConstruct
    private void startRetry() {
        new Thread(() -> {
            while (!stop) {
                try {
                    // 启动时先暂停3s
                    if (isSleep) {
                        Thread.sleep(DEFAULT_RECONNECTION_DELAY);
                        isSleep = false;
                    }
                    // 每次都停
                    Thread.sleep(RETRY_TIME_INTERVAL);
                    List<MessageWithTime> list = myStoredRabbitOperations.getSendFailMessages(name);
                    if (CollectionUtils.isEmpty(list)) {
                        continue;
                    }
                    long now = System.currentTimeMillis();
                    int count = 0;
                    for (MessageWithTime messageWithTime : list) {
                        Long messageWithTimeTime = messageWithTime.getTime();
                        if (messageWithTimeTime < (now - VALID_TIME)) {
                            ++count;
                            String correlationId = messageWithTime.getMessage().getMessageProperties().getCorrelationId();
                            if (StringUtils.isBlank(correlationId)) {
                                LOGGER.error("{} RabbitMQ 补偿发送,发现mq消息格式不正确,correlationId为空,数据为[{}]", name, JSON.toJSONString(messageWithTime));
                                continue;
                            }
                            // 4秒钟之前的数据进行重试
                            this.send(messageWithTime.getExchange(), messageWithTime.getRoutingKey(), messageWithTime.getMessage(), new CorrelationData(correlationId));
                            LOGGER.info("{} RabbitMQ 补偿发送, messageWithTime:[{}]", name, JSON.toJSONString(messageWithTime));
                        }
                    }
                    if (count > CACHE_THRESHOLD) {
                        LOGGER.error("{} RabbitMQ 补偿发送,发现积压消息过多,此次共重试了[{}]条mq消息", name, count);
                    }
                } catch (Throwable e) {
                    LOGGER.error("{} RabbitMQ 补偿发送 线程暂停出现异常:", name, e);
                }
            }
        }).start();
    }
}

规范MyStoredRabbitTemplate中使用数据的格式:

package com.springboot.test;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.amqp.core.Message;

/**
 * Created by shuai on 2019/5/10.
 */
public class MessageWithTime {

    // 发送时间
    private Long time;
    // 交换机名
    private String exchange;
    // routingKey名
    private String routingKey;
    // RabbitMQ发送的message内容
    private Message message;

    public MessageWithTime(Long time, String exchange, String routingKey, Message message) {
        this.time = time;
        this.exchange = exchange;
        this.routingKey = routingKey;
        this.message = message;
    }

    public Long getTime() {
        return time;
    }

    public void setTime(Long time) {
        this.time = time;
    }

    public String getExchange() {
        return exchange;
    }

    public void setExchange(String exchange) {
        this.exchange = exchange;
    }

    public String getRoutingKey() {
        return routingKey;
    }

    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }

    public Message getMessage() {
        return message;
    }

    public void setMessage(Message message) {
        this.message = message;
    }
}

重写的MyStoredRabbitTemplate中,构造方法为public MyStoredRabbitTemplate(ConnectionFactory connectionFactory, MyStoredRabbitOperations myStoredRabbitOperations, String RabbitTemplateName),其中的入参MyStoredRabbitOperations如下:

规范自定义的MyStoredRabbitTemplate类中对消息的行为操作,开发人员根据业务场景实现接口:

package com.springboot.test;

import org.springframework.amqp.core.Message;

import java.util.List;
import java.util.Map;

/**
 * Created by shuai on 2019/5/26.
 */
public interface MyStoredRabbitOperations {

    /**
     * 存储RabbitMQ信息
     *
     * @param correlationId
     * @param exchange
     * @param routingKey
     * @param message
     */
    void saveMessage(String name, String correlationId, String exchange, String routingKey, Message message);

    /**
     * 更新mq消息
     *
     * @param correlationId
     */
    void updateSendMessageSuccess(String name, String correlationId);

    /**
     * 根据correlationId获取mq消息
     * @param correlationId
     * @return
     */
    MessageWithTime getMessageByCorrelationId(String name, String correlationId);

    /**
     * 获取发送失败的mq消息
     *
     * @return
     */
    List<MessageWithTime> getSendFailMessages(String name);

}

MyStoredRabbitOperations接口的第一种实现,通过缓存的方式实现消息100%发送成功:

package com.springboot.test;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created by shuai on 2019/5/26.
 */
public class MyStoredRabbitOperationsByCache implements MyStoredRabbitOperations {

    private final static Logger LOGGER = LoggerFactory.getLogger(MyStoredRabbitOperationsByCache.class);

    /**
     * 缓存
     */
    private Map<String, MessageWithTime> map = new ConcurrentHashMap<>();

    /**
     * 4s时间间隔
     */
    private final static Long VALID_TIME = 4000l;

    /**
     * 插入已经发送的mq消息
     *
     * @param correlationId
     * @param exchange
     * @param routingKey
     * @param message
     */
    @Override
    public void saveMessage(String name, String correlationId, String exchange, String routingKey, Message message) {
        MessageWithTime messageWithTime = new MessageWithTime(System.currentTimeMillis(), exchange, routingKey, message);
        map.put(correlationId, messageWithTime);
    }

    /**
     * 消息发送成功
     *
     * @param correlationId correlationId
     */
    @Override
    public void updateSendMessageSuccess(String name, String correlationId) {
        map.remove(correlationId);
    }

    /**
     * 获取需要重试的mq消息
     *
     * @return
     */
    @Override
    public List<MessageWithTime> getSendFailMessages(String name) {
        Set<String> set = map.keySet();
        Long now = System.currentTimeMillis();
        List<MessageWithTime> list = new ArrayList<>();
        for (String key : set) {
            MessageWithTime messageWithTime = map.get(key);
            if (null != messageWithTime && messageWithTime.getTime() != null) {
                Long messageWithTimeTime = messageWithTime.getTime();
                if (messageWithTimeTime < (now - VALID_TIME)) {
                    // 4秒钟之前的数据进行重试
                    list.add(messageWithTime);
                }
            }
        }
        if (!CollectionUtils.isEmpty(list)) {
            LOGGER.info("{} RabbitMQ 获取发送失败,重新发送的数据为[{}]", name, JSON.toJSONString(list));
        }
        return list;
    }

    /**
     * 根据correlationId获取mq消息
     *
     * @param correlationId
     * @return
     */
    @Override
    public MessageWithTime getMessageByCorrelationId(String name, String correlationId) {
        return map.get(correlationId);
    }

}

MyStoredRabbitOperations接口的第二种实现,通过存储至mysql的方式实现消息100%发送成功:

package com.springboot.test;

import com.alibaba.fastjson.JSON;
import com.springboot.dao.second.MqPublisherDao;
import com.springboot.vo.MqPublisher;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * Created by shuai on 2019/5/26.
 */
@Component
public class MyStoredRabbitOperationsByMySql implements MyStoredRabbitOperations {

    @Autowired
    private MqPublisherDao mqPublisherDao;

    @Override
    public void saveMessage(String name, String correlationId, String exchange, String routingKey, Message message) {
        // sql .... insert mq
        MqPublisher mqPublisher = new MqPublisher();
        mqPublisher.setExchangeName(exchange);
        mqPublisher.setMessage(JSON.toJSONString(message));
        mqPublisher.setRabbitTemplateName(name);
        mqPublisher.setRoutingKey(routingKey);
        mqPublisher.setBody(new String(message.getBody()));
        mqPublisher.setStatus(MqPublisherStatusEnum.INIT.getStatus());
        mqPublisher.setCorrelationId(correlationId);
        mqPublisher.setCreateTime(new Date());
        mqPublisher.setUpdateTime(new Date());
        mqPublisherDao.insert(mqPublisher);

    }

    @Override
    public void updateSendMessageSuccess(String name, String correlationId) {
        // sql .... update mq success
        MqPublisher mqPublisherQuery = new MqPublisher();
        mqPublisherQuery.setCorrelationId(correlationId);
        mqPublisherQuery.setRabbitTemplateName(name);
        List<MqPublisher> list = mqPublisherDao.list(mqPublisherQuery);
        if (CollectionUtils.isEmpty(list) || list.size() > 1) {
            return;
        }
        MqPublisher mqPublisher = list.get(0);
        mqPublisher.setStatus(MqPublisherStatusEnum.SUCCESS.getStatus());
        mqPublisherDao.updateByPrimaryKeySelective(mqPublisher);
    }

    @Override
    public MessageWithTime getMessageByCorrelationId(String name, String correlationId) {
        // sql .... select mq by correlationId
        MqPublisher mqPublisherQuery = new MqPublisher();
        mqPublisherQuery.setCorrelationId(correlationId);
        mqPublisherQuery.setRabbitTemplateName(name);
        List<MqPublisher> list = mqPublisherDao.list(mqPublisherQuery);
        if (CollectionUtils.isEmpty(list) || list.size() > 1) {
            return null;
        }
        MqPublisher mqPublisher = list.get(0);
        Message message = JSON.parseObject(mqPublisher.getMessage(), Message.class);
        return new MessageWithTime(mqPublisher.getCreateTime().getTime(), mqPublisher.getExchangeName(), mqPublisher.getRoutingKey(), message);
    }

    @Override
    public List<MessageWithTime> getSendFailMessages(String name) {
        // 查询4秒前,未发送成功的mq数据返回

        // sql .... select all fail mq between time1 and time2
        List<MessageWithTime> list = new ArrayList<>();
        MqPublisher mqPublisherQuery = new MqPublisher();
        mqPublisherQuery.setStatus(1);
        mqPublisherQuery.setRabbitTemplateName(name);
        List<MqPublisher> mqPublisherList = mqPublisherDao.list(mqPublisherQuery);
        for (MqPublisher mqPublisher : mqPublisherList) {
            Message message = JSON.parseObject(mqPublisher.getMessage(), Message.class);
            MessageWithTime messageWithTime = new MessageWithTime(mqPublisher.getCreateTime().getTime(), mqPublisher.getExchangeName(), mqPublisher.getRoutingKey(), message);
            list.add(messageWithTime);
        }
        return list;
    }

}


验证猜想,并得出实验数据:

得出结论:

连接关闭后和再次建立连接期间,消息会丢失。当连接再次连接,消息会被补偿重试。

注意翻车

当生产者发送消息时,发送的exchange不存在或者对应的queue不存在,则会调用returnedMessage()方法,同时也会调用confirm()方法。而且boolean ack的值为true

这一点,开发人员需要警惕。

本文是作者根据日常业务场景遇到的问题提出的解决方案。如有错误的地方,还请指出,相互学习,共同进步

上一篇下一篇

猜你喜欢

热点阅读