2022-04-04_rocketmq事务回查自定义essage

2022-04-04  本文已影响0人  kikop

20220404_rocketmq事务回查自定义essageCheckListener学习笔记

1概述

1.1事务消息回查

rocketmq版本:V4.8.0

为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。

本文涉及的内容如下:

  1. 修改代理端配置broker.conf,单个消息的检查次数(默认15次)
  2. 自定义日志回查超时后处理逻辑AbstractTransactionalMessageCheckListener
  3. 手动修改代理端对应的Jar包rocketmq-broker(增加CheckListener和SPI配置)
  4. 模拟生产端发送事务型消息

2代码示例

2.1单个消息的检查次数

// D:\mqexperiment\rocketmqdebug\myrocketmqhome\conf\broker-adebug.properties
#事务回查次数,默认15次
transactionCheckMax = 3

2.2自定义日志回查超时后处理逻辑

package org.apache.rocketmq.broker.transaction.queue;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;

/**
 * @author kikop
 * @version 1.0
 * @project myproducerserver
 * @file MyAbstractTransactionalMessageCheckListener
 * @desc 自定义日志回查超时后处理逻辑
 * @date 2022/4/4
 * @time 9:30
 * @by IDE IntelliJ IDEA
 */
public class MyAbstractTransactionalMessageCheckListener extends DefaultTransactionalMessageCheckListener {

    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqTransaction");

    @Override
    public void resolveDiscardMsg(MessageExt messageExt) {

        log.error("begin resolveDiscardMsg...");
        // 1.topic移走
        super.resolveDiscardMsg(messageExt);
        // 2.记录自定义日志
        log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, commitLogOffset={}, real topic={}",
                new Object[]{messageExt.getQueueOffset(),
                        messageExt.getCommitLogOffset(),
                        messageExt.getUserProperty("REAL_TOPIC")});
        log.error("end resolveDiscardMsg...");

    }


}

2.3编写spi配置文件

// E:\workdirectory\OpenSourceStudy\rocketmq-all-4.8.0-source-release\broker\src\main\resources\META-INF\service\org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener
org.apache.rocketmq.broker.transaction.queue.MyAbstractTransactionalMessageCheckListener

2.4修改rocketmq-broker-4.8.0.jar

1.在META-INF\service增加文件:AbstractTransactionalMessageCheckListener
2.将MyAbstractTransactionalMessageCheckListener.class文件放在位置:org.apache.rocketmq.broker.transaction.queue

2.5编写生产端事务回查方法

package com.kikop.mycomponent.listener;

import com.kikop.utils.DateUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;


public class MyTransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    // key:transactionId
    // value:state
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     * 当发送半消息成功时,mqServer根据返回值决定是否提交事务,只执行一次
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println(String.format("开始执行本地事务,transactionId:%s, executeLocalTransaction:%s",
                msg.getTransactionId(), DateUtils.getTime()));
        //  直接模拟事务执行结果未知,触发 rocketmq 定时事务回查
        int value = transactionIndex.getAndIncrement();
//        int status = value % 3;
        int status = 0;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 如果已经成功,则不需再次检查
     * 获取本地事务状态
     * 默认检查15次,周期:1分钟
     * 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次
     *
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println(String.format("开始执行事务回查,transactionId:%s, checkLocalTransaction:%s", msg.getTransactionId(), DateUtils.getTime()));
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

2.6编写生产端测试代码

@Override
    public void sendTransactionMsg() {

        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};

        for (int i = 0; i < 1; i++) {
            try {
                Message msg =
                        new Message("myTransactionTopic2", tags[i % tags.length], "KEY" + i,
                                ("Hello kikop " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

                // 生成一个事务ID
                // 发送的时候会自动调用一次 executeLocalTransaction
                SendResult sendResult = transactionMQProducer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }

2.7测试

2.7.1生产端日志

// 1.消息发送
SendResult [sendStatus=SEND_OK, msgId=7F000001192C14DAD5DC13F38BCC0000, offsetMsgId=null, messageQueue=MessageQueue [topic=myTransactionTopic2, brokerName=broker-adebug, queueId=2], queueOffset=0]
// 2.执行本地事务
开始执行本地事务,transactionId:7F000001192C14DAD5DC13F38BCC0000, executeLocalTransaction:2022-04-04 20:58:48

// 3次事务回查
开始执行事务回查,transactionId:7F000001192C14DAD5DC13F38BCC0000, checkLocalTransaction:2022-04-04 20:59:15
开始执行事务回查,transactionId:7F000001192C14DAD5DC13F38BCC0000, checkLocalTransaction:2022-04-04 21:00:15
开始执行事务回查,transactionId:7F000001192C14DAD5DC13F38BCC0000, checkLocalTransaction:2022-04-04 21:01:15

2.7.2代理端日志

2022-04-04 21:02:15 ERROR TransactionalMessageCheckService - begin resolveDiscardMsg...
    
2022-04-04 21:02:15 ERROR TransactionalMessageCheckService - MsgExt:MessageExt [brokerName=null, queueId=0, storeSize=351, queueOffset=3, sysFlag=0, bornTimestamp=1649077128141, bornHost=/192.168.174.110:53893, storeTimestamp=1649077275581, storeHost=/192.168.174.110:10911, msgId=C0A8AE6E00002A9F0000000000001441, commitLogOffset=5185, bodyCRC=2146650276, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='RMQ_SYS_TRANS_HALF_TOPIC', flag=0, properties={REAL_TOPIC=myTransactionTopic2, TRANSACTION_CHECK_TIMES=3, KEYS=KEY0, TRAN_MSG=true, UNIQ_KEY=7F000001192C14DAD5DC13F38BCC0000, CLUSTER=rocketmq-clusterdebug, WAIT=true, PGROUP=please_rename_unique_group_name, TAGS=TagA, REAL_QID=2}, body=[72, 101, 108, 108, 111, 32, 107, 105, 107, 111, 112, 32, 48], transactionId='null'}] has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC
2022-04-04 21:02:15 INFO TransactionalMessageCheckService - create new topic TopicConfig [topicName=TRANS_CHECK_MAX_TIME_TOPIC, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]
2022-04-04 21:02:15 INFO brokerOutApi_thread_1 - register broker[0]to name server 127.0.0.1:9876 OK
2022-04-04 21:02:15 INFO TransactionalMessageCheckService - Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset=3, commitLogOffset=5185, real topic=myTransactionTopic2
2022-04-04 21:02:15 INFO TransactionalMessageCheckService - Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset=3, commitLogOffset=5185, real topic=myTransactionTopic2
    
2022-04-04 21:02:15 ERROR TransactionalMessageCheckService - end resolveDiscardMsg...

参考

1RocketMQ源码解析(搭建环境)

https://inetyoung.blog.csdn.net/article/details/109036959

2消息中间件 RocketMQ 源码解析 —— 调试环境搭建

https://blog.csdn.net/qiujiavip/article/details/99842544

3RocketMQ源码分析之消息存储

https://zhuanlan.zhihu.com/p/58728454

上一篇下一篇

猜你喜欢

热点阅读