8:RocketMq实战 分布式事务消息架构讲解(文末有项目连

2021-04-19  本文已影响0人  _River_
官方文档:
http://rocketmq.apache.org/docs/transaction-example/
1:什么是分布式事务:
 单体事务:强一致性 干性事务
 分布式事务:最终一致性性  柔性事务

什么是分布式事务
来源:单体应用一>拆分为分布式应用
一个接口需要调用多个服务,且操作不同的数据库,数据一致性难保障,

分布式事务常见解决方案:
     2PC :两阶段提交,基于XA协议
     TCC : Try、Confirms Cancel
2:RocketMQ4.X分布式事务消息架构讲解
RocketMQ事务消息:
    RocketMQ提供分布事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致 
    
半消息 Half Message
    Producer已经将消息成功发送到了Broker端,但是服务端未收到生产者对该消息的二次确认,
    此时该消息被标记成“暂不能投递”(暂时不能被Consumer消费)状态,处于该种状态下的消息即半消息。

消息回查:
    由于网络闪断、生产者应用重启等原因,导致某条事务消息的 二次确认步骤(第4步) 丢失,消息队列 
    RocketMQ服务端通过扫描发现某条消息长期处于“半消息”时,
    需要主动向消息生产者询问 该消息的最终状态(Commit或是Rollback),该过程即消息回查。

如果步骤4丢失 则需要执行步骤 5  6  7   
RocketMQ事务消息的状态
    COMMIT_MESSAGE:提交事务消息,消费者可以消费此消息
    ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能消费
    UNKNOW: Broker需要回查确认消息的状态

关于事务消息能否成功投递到Broker节点
    事务消息producer端的生产方式和普通消息是一样的,确保消息能发送到Broker节点(具有重试机制)

关于事务消息的消费
    事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息能被consumer收到
   (消息重试等机制,最后也存在consumer消费失败的情况,这种情况出现的概率极低)
3:RocketMQ4.X分布式事务消息的存在的问题
RocketMQ4.X分布式事务的解决了什么
仅是解决 消息的发送方的事务 以及  确保在消息发送方在事务成功之后 把消息投递到订阅端

发送方=生产者 订阅端=消费者

但并不包括 订阅端的事务失败后回滚 发送方的事务
但并不包括 订阅端的事务失败后回滚 发送方的事务
但并不包括 订阅端的事务失败后回滚 发送方的事务

而这个对于消息队列来说  其实并不是的问题:
因为我们的消息队列的目标 就是进行异步削峰

什么是消息队列:
我发送方的消息 只管发  你订阅端的逻辑成不成功和我有什么关系

如果要两个服务之间的数据库需要实现最终一致性,即是保证两个服务的事务的一致成功或者失败,
那就是要等两个服务的逻辑都完全走完,事务完全提交,这压根就不是异步削峰了。    
完全可以采用调接口的方法进行,而不是采用发消息的方式。
4:TransactionProducer
生产者:
    主要需要把DefaultMQProducer 换成 TransactionMQProducer 
    并设置使用TransactionListener 进行本地事务的监听

注意:消费者完全不用修改(只需要修改订阅的 topic就行了)
@Component
public class TransactionProducer {
    
//        DefaultMQProducer就是我们最普通的生产者
//        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
//        TransactionMQProducer 继承了 DefaultMQProducer
    
    public void sendMessageInTransaction() throws MQClientException, InterruptedException {

        TransactionMQProducer transactionMQProducer = new TransactionMQProducer();
        //该生产者所在group
        transactionMQProducer.setProducerGroup("transaction_producer_group");
        ///如果是集群模式 以 ; 分开   "IP1:9876;IP2:9876;"
        transactionMQProducer.setNamesrvAddr("47.113.101.241:9876");
        //是否走Vip通道
        transactionMQProducer.setVipChannelEnabled(false);
        //消息同步发送失败重试次数
        transactionMQProducer.setRetryTimesWhenSendFailed(3);
//        //消息异步发送失败重试次数
        transactionMQProducer.setRetryTimesWhenSendAsyncFailed(3);


        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        //使用实现类
        transactionMQProducer.setExecutorService(executorService);

        //创建事务实现类
        TransactionListener transactionListener = new TransactionListenerImpl();
        transactionMQProducer.setTransactionListener(transactionListener);

        transactionMQProducer.start();

        //设置Topic
        String topic = "transaction_test_topic";

        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                        new Message(topic, tags[i % tags.length], "KEY" + i,
                                ("Hello RocketMQ OrderNo: " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));

                //消息在事务里面发送出去了
                SendResult sendResult = transactionMQProducer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        //生产者延时关闭 因为在需要 1分钟内才能进入 UNKNOW状态消息的的回查逻辑
        Thread.sleep(1000 * 60);
        transactionMQProducer.shutdown();
    }
}
5:TransactionListenerImpl
TransactionListenerImpl 实现了 TransactionListener 重写了相关逻辑
executeLocalTransaction 方法进行 本地事务完成后提交 设置Broker里面半成功消息的状态
checkLocalTransaction  方法用于 当Broker服务认为本地事务的并没有进行提交 或者 提交了UNKNOW后 进行回查
@Slf4j
public class TransactionListenerImpl implements TransactionListener {

    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * when send transactional prepare(half) message succeed,
     * this method will be invoked to execute local transaction.
     * 当发送半消息成功后
     * 可以设置该消息在broker的状态为
     *      COMMIT_MESSAGE(Broker端直接消费)
     *      ROLLBACK_MESSAGE(Broker端直接回滚)
     *      UNKNOW(1分钟内才能 进入checkLocalTransaction的回查逻辑)
     *      null  (立刻进入反查逻辑)
     *
     * @param msg
     * @param otherParam
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object otherParam) {

        log.info("executeLocalTransaction:{} ", msg.getTransactionId());

        //模拟生成订单号  0 到 9
        //使用orderNo进行各种业务处理  然后返回一个 status
        //这里暂时设置根据orderNo执行各种业务逻辑后  返回的status为如下
        Integer orderNo = transactionIndex.getAndIncrement();
        Integer status = orderNo % 3;

        //在这里模拟本地事务的状态
        //有四种状态  UNKNOW  COMMIT_MESSAGE   ROLLBACK_MESSAGE  以及null
        //如果是 UNKNOW 会在00秒后进入checkLocalTransaction 逻辑
        //如果是 null   会立即进入checkLocalTransaction 逻辑

        if (null != status) { switch (status) {
                case 0:
//                  订单号  0  3  6 9  成功
                    localTrans.put(msg.getTransactionId(), orderNo);
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 1:
//                   订单号  1  4  7  失败
                    localTrans.put(msg.getTransactionId(), orderNo);
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                case 2:
                    //该msg.getTransactionId() 需要在回调时使用
//                    订单号  2   5   8  进入回调
                    localTrans.put(msg.getTransactionId(), orderNo);
                    return LocalTransactionState.UNKNOW;
                default:
                    //该msg.getTransactionId() 需要在回调时使用
                    localTrans.put(msg.getTransactionId(), orderNo);
                    return null;
            }
        }
        return null;
    }

    /**
     * When no response to prepare(half) message. broker will send check message to check the transaction status,
     * and this method will be invoked to get local transaction status.
     *
     *  在进行发送消息后 到到Broker变成半消息状态
     *  执行完executeLocalTransaction  其状态是  UNKNOW 或者 null  则需要进入回查处理
     *
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {

        String transactionId = msg.getTransactionId();
        Integer orderNo =  localTrans.get(transactionId);
        log.info("执行的逻辑 checkLocalTransaction:{}  orderNo:{} ",transactionId,orderNo);

        //根据orderNo进行逻辑回查处理
        //进行回查的结果 只能是 COMMIT_MESSAGE(成功) 或者  ROLLBACK_MESSAGE(失败)
        //这里暂时设置根据orderNo执行各种业务逻辑后  返回为如下
        if (orderNo % 2 == 0) {
            log.info("执行的逻辑 checkLocalTransaction: {}  orderNo:{},COMMIT_MESSAGE", msg.getTransactionId(),orderNo);
            return LocalTransactionState.COMMIT_MESSAGE;

        }else
        {
            log.info("执行的逻辑 checkLocalTransaction: {}  orderNo:{},ROLLBACK_MESSAGE", msg.getTransactionId(),orderNo);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}
6:测试结果
模拟生成订单号  0 到 9
在 executeLocalTransaction(本地事务)    订单号  0  3  6 9  成功  修改消息状态为可以被消费
在 executeLocalTransaction(本地事务)    订单号  1  4  7    失败    这些消息直接被丢弃

订单号  2   5   8  进入回调
在 checkLocalTransaction(进行反查)    订单号  2  8   经过反查后确认成功    修改消息状态为可以被消费
在 checkLocalTransaction(进行反查)    订单号  5      经过反查后确认失败    这些消息直接被丢弃
7:transactionMQProducer.sendMessageInTransaction(核心方法)
1:设置为半消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");

2:和普通发送消息一样发送
sendResult = this.send(msg);

3:判断transactionListener 是否为空 不为空则执行重写的 executeLocalTransaction
if (transactionListener != null) {    
    log.debug("Used new transaction API");    
    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}

4:endTransaction 方法
    事务设置消息相关的状态(4种)
    通过Oneway的方式告诉Borker端 修改里面半消息的状态
5:在一个异步的方法里面 checkLocalTransaction(回查)
     LocalTransactionState 为UNKNOW的状态的消息进行回查

项目连接

请配合项目代码食用效果更佳:
项目地址:
https://github.com/hesuijin/hesuijin-study-project
Git下载地址:
https://github.com.cnpmjs.org/hesuijin/hesuijin-study-project.git

rocketmq-module项目模块下 transactionRocketMQ包下
上一篇 下一篇

猜你喜欢

热点阅读