分布式事务&seata

2023-01-15  本文已影响0人  shoyu666

微服务中的分布式事务问题

在单体应用中,3个子模块(库存stock,订单order,账户account),数据的一致性是由数据库保证。

image.png
但是当场景是微服务的场景时
image.png
库存,订单,账户只能保证各自的数据一致性,无法保证全局的数据一致性。

分布式事务seata

分布式事务的发展过程中,有一些著名的理论基础如二阶段提交协议(2pc),协议把分布式事务的过程分为2个阶段进行。
基于2pc的分布式事务模型有XA、AT、TCC、SAGA,他们的共同点都是基于2pc协议,同时各自有各自的特点和适用场景。
那么seata是什么,他跟2pc以及XA、AT、TCC、SAGA的关系?
Seata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。本质上,seata是打包实现了 XA、AT、TCC、SAGA模型,同时支持Dubbo、Spring Cloud、Sofa-RPC、Motan 和 gRPC 等RPC框架的便捷接入,高可用等。
也就是2pc以及XA、AT、TCC、SAGA偏协议和理论。
seata基于协议和理论给出了具体实用的解决方案。
其中AT模式的方便易用,代码无侵入的特点被广泛使用。

seata解决方案图示:


image.png

从图中看出,seata整个分布式事务分为3个角色(RM,TM,TC)。
RM,TM是内嵌于微服务中(或者说内嵌于客户端中),TC是独立部署的一个服务(相当于服务端)。

seata的-全局事务是由很多分支事务组成,一般来说,分支事务就是本地事务


image.png

seata的快速使用

用例

用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

仓储服务:对给定的商品扣除仓储数量。
订单服务:根据采购需求创建订单。
帐户服务:从用户帐户中扣除余额。

架构图
image.png
仓储服务
public interface StorageService {

    /**
     * 扣除存储数量
     */
    void deduct(String commodityCode, int count);
}
订单服务
public interface OrderService {

    /**
     * 创建订单
     */
    Order create(String userId, String commodityCode, int orderCount);
}
帐户服务
public interface AccountService {

    /**
     * 从用户账户中借出
     */
    void debit(String userId, int money);
}
主要业务逻辑
public class BusinessServiceImpl implements BusinessService {

    private StorageService storageService;

    private OrderService orderService;

    /**
     * 采购
     */
    public void purchase(String userId, String commodityCode, int orderCount) {

        storageService.deduct(commodityCode, orderCount);

        orderService.create(userId, commodityCode, orderCount);
    }
}
public class OrderServiceImpl implements OrderService {

    private OrderDAO orderDAO;

    private AccountService accountService;

    public Order create(String userId, String commodityCode, int orderCount) {

        int orderMoney = calculate(commodityCode, orderCount);

        accountService.debit(userId, orderMoney);

        Order order = new Order();
        order.userId = userId;
        order.commodityCode = commodityCode;
        order.count = orderCount;
        order.money = orderMoney;

        // INSERT INTO orders ...
        return orderDAO.insert(order);
    }
}
seata 的分布式交易解决方案
image.png

我们只需要使用一个 @GlobalTransactional 注解在业务方法上:


    @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount) {
        ......
    }
dubbo中的seata

Business发起全局事务,那么Storage,Order,Account是如何感知的,或者说他们是如何串在一起的。
这里涉及一个xid的概念,xid是分布式全局事务的唯一id,所以只要Storage,Order,Account绑定的都是同一个xid就能保证他们是在同一个全局事务中,所以只要保证xid在链路上的传递就可以了。
在dubbo中,xid的传递是通过Fitter实现的。

/**
 * The type Transaction propagation filter.
 */
@Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
public class TransactionPropagationFilter implements Filter {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String xid = RootContext.getXID(); // 获取当前事务 XID
        String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID); // 获取 RPC 调用传递过来的 XID
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
        }
        boolean bind = false;
        if (xid != null) { // Consumer:把 XID 置入 RPC 的 attachment 中
            RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
        } else {
            if (rpcXid != null) { // Provider:把 RPC 调用传递来的 XID 绑定到当前运行时
                RootContext.bind(rpcXid);
                bind = true;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                }
            }
        }
        try {
            return invoker.invoke(invocation); // 业务方法的调用

        } finally {
            if (bind) { // Provider:调用完成后,对 XID 的清理
                String unbindXid = RootContext.unbind();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                }
                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                    LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                    if (unbindXid != null) { // 调用过程有新的事务上下文开启,则不能清除
                        RootContext.bind(unbindXid);
                        LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                    }
                }
            }
        }
    }
}
参考

seata github
seata官方文档

上一篇下一篇

猜你喜欢

热点阅读