我的微服务设计方案微服务

RocketMQ实现分布式事务

2020-02-14  本文已影响0人  任未然

一. 概述

常见分布式事务的解决方案有:

二. 基础概念

RocketMQ是一种最终一致性的分布式事务, 就是说它保证的是消息最终一致性

2.1 事务交互流程

说明:

  1. 发送方发送半消息给服务端, 消息中携带通知B服务执行需要的信息
  2. 服务端接受半消息成功后给发送方返回成功的通知
  3. 发送方接收到成功通知后开始执行本地事务
  4. 如果本地事务成功, 那么久通知服务端把半消息推送到订阅方, 否则取消半消息的推送
  5. 如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口, 来进行事务结果的回查。
  6. 检查本地数据库提交结果, 查看是否已提交
  7. 更具查看事务结果通知服务端是否推送半消息到订阅方
  8. 消息推到订阅方, 订阅方接收到消息后执行事务, 只要保证消费方失败重试, 就能保证最终一致性

2.2 方法说明

名词 说明
Half Message 事务消息 也称半消息 标识该消息处于"暂时不能投递"状态,不会被Comsumer所消费,待服务端收到生成者对该消息的commit或者rollback响应后,消息会被正常投递或者回滚(丢弃)消息
TransactionMQProducer 半消息的发送者
TransactionListener 处理本地事务, 根据本地事务处理结果决定半消息是否发送, 需重写实现方法: 1.executeLocalTransaction:执行本地事务操作; 2.checkLocalTransaction:回查本地事务操作
LocalTransactionState 事务消息的状态,有三种状态:CommitTransaction(提交) 、RollbackTransaction(回滚)、Unknown(未知)

三. 使用示例

现在有个转账业务, 用户A转账给用户B, 设计两个操作: 用户A扣钱和用户B加钱, 这两个操作是在两个不同的服务器执行的.

3.1 实现原理

说明:

  1. 用户A在扣款之前,先发送半消息到中间件
  2. 半消息发送成功后,执行扣款本地事务
  3. 扣款事务执行成功后,通过推送消息通知另外一台服务器进行用户B加钱事务

3.2 用户A扣钱操作

3.2.1 Service层(实现:TransactionListener类,进行扣钱事务操作)

@Service
public class TransactionListenerImpl implements TransactionListener {
    // 执行本地事务操作
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 开始执行用户A扣钱操作
            // 事务提交成功后返回提交状态通知
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 事务提交失败后返回回滚状态通知
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    // 回查本地事务操作
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        boolean flag = true; // 查看数据库事务提交结果, 提交成功返回true, 否则返回false
        if(flag){
            return LocalTransactionState.COMMIT_MESSAGE;
        }else {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}

3.2.2 Controller层(消息发送)

@RestController
public class Producer {
    @Autowired
    private ObjectMapper objectMapper;
    @Resource
    private TransactionListenerImpl transactionListener;
    @RequestMapping("/sendMessage")
    public Object sendMessage(Map<String,Object> param) {
        try {
            //消息对象
            Message message = new Message();
            //设置主题内容
            message.setBody(objectMapper.writeValueAsString("通知内容").getBytes());
            message.setTopic("topicName");//设置主题名
            message.setTags("topicTag");// 设置标签
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer();
            // 设置处理对象
            transactionMQProducer.setTransactionListener(transactionListener);
            // 发送半消息
            TransactionSendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
            if(SendStatus.SEND_OK == sendResult.getSendStatus()){
                // 半消息发送成功
                return "success";
            }else {
                // 半消息发送失败
                return "error";
            }
        } catch (Exception e) {
            return "error";
        }
    }
}

3.2 用户B加钱操作

@Configuration
public class ConsumerService {
    @Bean
    public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
        //消息接受者
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
        //设置ConsumerGroup
        defaultMQPushConsumer.setConsumerGroup("consumerGroupName");
        //设置Nameserve
        defaultMQPushConsumer.setNamesrvAddr("nameServe");
        //设置主题与主题下的标签
        defaultMQPushConsumer.subscribe("topicName", "topicTag");
        //  开始接收消息,当订阅主题内容发生变化,本方法就会执行
        defaultMQPushConsumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
            //遍历消息队列
            msgs.forEach(mt -> {
                // 找到对应的通知后, 判断是否已经处理过, 否则进行用户B的加钱操作
                // ......
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        defaultMQPushConsumer.start();
        return defaultMQPushConsumer;
    }
}
上一篇下一篇

猜你喜欢

热点阅读