换一种方式查看死信队列的数据

2021-01-06  本文已影响0人  滴流乱转的小胖子

消费端重试超过16次的消息,会进入死信队列,此类消息需要人工接入或是程序定时处理下未成功的数据。
之前需要到RocketMq-Console-Ng的死信队列主题下查看,不是很方便。
一般套路会思考是否要修改rockmq的相关处理的源码,修改记录数据源,但是这种对代码侵入性太大,不便于灵活维护。
经过思考决定采用如下aop机制,在数据库中再记录一遍数据。



@Aspect
@Component
public class DlLogAspect {

    /** 日志打印 */
    private Logger logger = LoggerFactory.getLogger(DlLogAspect.class);
    /** rocketmq默认重试16次 */
    private static final Integer MAXTIMES = Integer.valueOf("16");

    /** 主键服务 */
    @Autowired
    private IIdService idService;

    /** 日志记录服务 */
    @Autowired
    private IMessageErrorLogService messageErrorLogService;

    /**
     * 定义切点,捕获所有消息处理方法
     * @param messageExt messageExt
     */
    @Pointcut("execution(* org.apache.rocketmq.spring.core.RocketMQListener.onMessage(..)) && args(messageExt)")
    public void dlMessageLog(MessageExt messageExt) { }

    /**
     * 后置通知
     * @param messageExt messageExt
     */
    @Before("dlMessageLog(messageExt)")
    public void dlListener(MessageExt messageExt) {

        try {
            dlMessageLogCreate(messageExt);
        } catch (Exception e) {

            e.printStackTrace();
            logger.error("死信队列记录出错" + e);
        }
    }

    /**
     * 死信消息创建
     * @param message message
     */
    private void dlMessageLogCreate(MessageExt message) {

        int retryTimes = message.getReconsumeTimes();
      
        // 打印重试次数,便于在ES中查看,及时发现错误
        if (retryTimes > NumberConsts.INT_1) {
            logger.error("topic={}, tags={}, msgId={}, body={} 的重试次数为{}", message.getTopic(), message.getTags(),
                    message.getMsgId(), new String(message.getBody()), retryTimes);
        }

        if (retryTimes >= MAXTIMES) {

            //到达最大重试次数,将会进入死信队列,记录入库,以便再次消费
            logger.error("消息重试次数达到最大值.消息内容为" + new String(message.getBody()));

            MessageErrorLog messageErrorLog = new MessageErrorLog(idService.getUniqueId(), message.getTopic(),
                    message.getTags(), new String(message.getBody()));
            // 插入数据库中,便于人工接入回溯问题
            messageErrorLogService.insert(messageErrorLog);
        }
    }
}

上一篇 下一篇

猜你喜欢

热点阅读