javaWeb学习工作生活

改造Spring事务实现Spring Cloud分布式事务

2019-07-04  本文已影响34人  giafei

网上很多资料或组件都是完全重新实现事务管理,而本文选择改造Spring事务的提交过程,这样在即简化了集成成本,又不会因事务管理代码出现Bug导致系统没有满足事务特性(ACID),即如果在一次微服务的调用过程中所有的Spring的事务都提交或撤销了,整个系统也满足ACID。

说起事务本身就挺麻烦的,在单点应用中事务由数据库来实现,但却是个业务层概念。Spring通过线程存储共享事务对象实现了业务层事务。网上常见的分布式事务协议有2PC、3PC与TCC,其中TCC对业务层侵入性强,对业务设计改动比较大,不符合前置设想。
2PC/3PC协议是资源层协议,比较适用于数据库直接互相沟通事务,我们可以借助于Spring的业务层事务方式实现业务层的分布式事务。3PC第一个阶段要求所有参与者达成一致后再进行事务操作,即在一开始要求所有参与者都已就位,而微服务的链式调用是同步的,参与者是逐步增加的,所以3PC在微服务场景基本不可实现。本文选择实现2PC协议,2PC协议也是异步的,但2PC不要求所有参与者一开始就位,只要在阶段2之前就位即可,同步可以看做是2PC的异步的一种特殊的情况。

一致性与原子性

一致性这个词在各种资料中算是歧义特别大了,原子性反而出奇的统一。

原子性目前有两种意思,一个是多线程中操作是不可分的,另一个是事务中所有的操作要么都成功,要么全都失败。这两种意思语境很少重复,所以提到原子性基本不会有歧义。

一致性有分布式系统的一致性、事务的一致性、一致性Hash等解释,事务的一致性简单来说就是事务前后数据库的完整性约束不能破坏,它是由事务的其他3个特性共同保证的。
分布式系统中的一致性一般指的是CAP理论中的C,它的定义是所有节点的数据是相同的,又有人把这个定义称为一致性状态,而在分布式系统中协商达成一致性状态的协议算法为一致性协议,2PC就是一个一致性协议。

当事务和分布式系统有交集时,一致性这个词就彻底乱套了,甚至和原子性搅在了一起,网上铺天盖地的最终一致性的资料很多其实是事务的最终原子性。

本文中一致性从CAP理论取义为 所有节点的数据是相同的,英文为"all nodes see the same data at the same time"

因为分布式系统天然有事务的隔离性,事务的持久性由数据库保证,因此我们只要在Spring事务基础上实现分布式事务的原子性,就进而实现了分布式事务的一致性。

即,只要在数据库事务的基础上实现分布式事务的原子性,就实现了分布式事务的ACID4个特性。因此在后文中用原子性指代分布式事务的整体4个特性。

因Spring的事务使用导致的事务一致性错误不在本文讨论范围内。

对于2PC协议,一致性状态为所有节点对事务的提交或回滚达成一致,一致性的结果是事务的原子性。因此2PC协议的一致性和分布式事务的一致性可以看做是等价的。

本文实现与2PC协议的差异性

2PC的各个参与者是平等,互相隔离的。但微服务不是,微服务是有调用与被调用关系的,被调用者的异常会天然返回给调用者并自然引起调用者的连锁回滚。
2PC协议要求服务恢复后能回到事务中,但这个能力只有数据库自己有,Spring的事务提交前宕机,数据库会自己回滚。因此本文实现的2PC协议会在任何一个参与者宕机后整体回滚,此外还有在一致提交时,业务宕机也可能导致的原子性失败。
由于无法保证业务宕机恢复后重试事务的上下文与宕机前一致,所以这个问题不能简单的在技术上解决,必须要业务配合。比如订单、仓储业务,当用户支付订单后仓储服务提交事务前宕机,仓储服务恢复后并不确定还有没有货,如果没货,只能在业务中退款。

协调者宕机异常

协调者正常的前提下,参与者宕机,我们可以简单的回滚事务才达成一致性。如果协调者宕机且没有Fail Over,这种情况需要详细分析。
如果协调者在阶段一宕机,由于微服务的同步性,整个调用栈会在某个业务阻塞等待进行阶段一投票,与2PC协议相同。即协调者在阶段一宕机会导致业务阻塞但不会造成不一致。
如果协调者阶段二宕机,此时同步调用已经完成,等待提交的消息。如果等待超时,也会导致参与者回滚。所以本文的实现,在阶段二协调者和参与者任一宕机都可能导致不一致。


协调者

本文采用Redis充当协调者,但Redis主从切换可能导致数据不一致,上一节中讨论阶段一协调者宕机不会造成不一致,我们要在设计时弥补这一可能性。

协调者协议实现

设计的存储结构如下

数据类型 描述
cloud-transaction/事务ID/state Long 事务存在标志,
值为0时表示事务异常,后续的参与者自行回滚
值为1表示阶段1,值为2表示阶段2
cloud-transaction/事务ID/result String 事务的最终结果,是COMMIT还是ROLLBACK
cloud-transaction/事务ID/notice String 协调者向参与者的通知订阅KEY
cloud-transaction/事务ID/vote Hash[参与者,状态] 投票记录,
HKEY为参与者ID,HValue为投票状态
cloud-transaction/事务ID/ack Hash[参与者,状态] 执行记录,
HKEY为参与者ID,HValue为执行状态

正常流程为:

  1. 业务的开始方为watchdog,负责创建事务ID,写入state为1,并在微服务之间传递事务ID
  2. 判断state值是否为1,若是将Spring事务的执行结果写入投票记录,否则自行ROLLBACK
  3. 调用栈返回至watchdog处,此时可以获取到调用栈是否有异常,同时与投票记录互补
  4. watchdog写state为2,同时将结果写入result
  5. watchdog在notice上广播结果,并设置各键值的过期时间
  6. 各参与者收到通知后执行结果,并写入执行记录

为防范1-3步(阶段1)Redis或参与者异常,在步骤4做如下设定:

  1. 调用栈或投票记录只要有一个为ROLLBACK,整体ROLLBACK
  2. state必须为1或键不存在

即如果Redis异常未能恢复或参与者业务异常,调用栈必然异常。反之调用栈无异常,则表明无业务异常并且Redis最终正常(最终一致性的最终),可通过校验state值判断vote值的可信度。当参与者超时后可以从state和result综合判断是否应该提交事务。有的Redis设置Master宕机时,Slave可以读数据,这种设置最终会在各事务超时回滚。

这么多问题这东西还能用么

标准2PC的协调者是没有存储的,宕机再恢复需要从各参与者获取事务数据。本实现用的是Redis,Redis还是挺可靠的,还可以主从顶一顶。如果系统压力没那么大,负载并不高,用起来是没啥问题的。并且本实现还支持事务重用,即当微服务调用兜兜转转又回来的时候,事务是重用的,可以解决隔离性问题。

为什么要造轮子


关键代码

在微服务之间共享事务ID

通过RequestInterceptor为请求微服务之间的调用增加一个http头,这样可以方便的传递事务ID。同时如果事务创建时没有这个http头,那么当前业务就处在微服务栈的栈底

@Component
public class CloudTransactionIdFeignInterceptor implements RequestInterceptor {

    private static final String REQUEST_ATTRIBUTE_TRANSACTION_ID = "X-TRANSACTION-ID";
    private static final String REQUEST_HEADER_TRANSACTION_ID = "X-TRANSACTION-ID";

    public static HttpServletRequest getCurrentRequest() {
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.currentRequestAttributes();
        return attributes.getRequest();
    }

    @Override
    public void apply(RequestTemplate template) {
        try {
            HttpServletRequest currentRequest = getCurrentRequest();
            Object attribute = currentRequest.getAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID);
            if (attribute != null) {
                template.header(REQUEST_HEADER_TRANSACTION_ID, attribute.toString());
            } else {
                String header = currentRequest.getHeader(REQUEST_HEADER_TRANSACTION_ID);
                if (StringUtils.hasText(header)) {
                    template.header(REQUEST_HEADER_TRANSACTION_ID, header);
                }
            }
        } catch (Throwable e) {
            //不能影响正常的流程运行
        }
    }
    //获取事务ID
    public static String getCloudTransactionId() {
        HttpServletRequest currentRequest = getCurrentRequest();
        Object attribute = currentRequest.getAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID);
        if (attribute != null)
            return attribute.toString();

        return currentRequest.getHeader(REQUEST_HEADER_TRANSACTION_ID);
    }
    //广播事务ID
    public static void broadcastCloudTransactionId(String id) {
        getCurrentRequest().setAttribute(REQUEST_ATTRIBUTE_TRANSACTION_ID, id);
    }
}

拦截Spring事务的提交

Spring通过PlatformTransactionManager Bean对象管理事务,我们实现一个Wrapper把Spring的Bean包起来,拦截关键调用。具体代码见类 WrappedDataSourceTransactionManager

对Spring 事务的 ThreadLocal数据的处理

private String doMoveThreadData(String id) {

        TransactionThreadData data = new TransactionThreadData();
        data.resources = new HashMap<>(TransactionSynchronizationManager.getResourceMap());
        data.synchronizations = TransactionSynchronizationManager.getSynchronizations();
        data.currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
        data.currentTransactionReadOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
        data.currentTransactionIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
        data.actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();

        for (Object key : data.resources.keySet()) {
            TransactionSynchronizationManager.unbindResource(key);
        }

        TransactionSynchronizationManager.clear();

        dataMap.put(id, data);

        return id;
    }

    public boolean restoreThreadData(String id) {
        TransactionThreadData data = dataMap.get(id);
        dataMap.remove(id);

        if (data == null)
            return false;

        for (Map.Entry<Object, Object> entry : data.resources.entrySet()) {
            TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue());
        }

        TransactionSynchronizationManager.initSynchronization();
        for (TransactionSynchronization synchronization : data.synchronizations) {
            TransactionSynchronizationManager.registerSynchronization(synchronization);
        }

        TransactionSynchronizationManager.setCurrentTransactionName(data.currentTransactionName);
        TransactionSynchronizationManager.setCurrentTransactionReadOnly(data.currentTransactionReadOnly);
        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(data.currentTransactionIsolationLevel);
        TransactionSynchronizationManager.setActualTransactionActive(data.actualTransactionActive);

        return true;
    }

完整代码见 类 TransactionThreadDataContainer

微服务饶了一圈又回来,继续用未提交的事务

Spring事务是基于ThreadLocal的,只要把ThreadLocal搬过来就可以了

    public static class LoadBalancerFeignClientWrapper implements Client {
        private LoadBalancerFeignClient wrapped;
        private WrappedDataSourceTransactionManager transactionManager;

        public LoadBalancerFeignClientWrapper(Client delegate,
                                       CachingSpringLoadBalancerFactory lbClientFactory,
                                       SpringClientFactory clientFactory, PlatformTransactionManager transactionManager) {
            wrapped = new LoadBalancerFeignClient(delegate, lbClientFactory, clientFactory);
            this.transactionManager = (WrappedDataSourceTransactionManager)transactionManager;
        }

        @Override
        public Response execute(Request request, Request.Options options) throws IOException {
            try {
                 //这里是调用其他微服务,调用的时候把TLS剥离出来
                 //调用完毕或异常时再恢复回来
                 //剥离后的数据就可以安放到其他的线程,达到重用事务的目的
                transactionManager.stealTransactionThreadData();
                Response response = wrapped.execute(request, options);
                transactionManager.returnTransactionThreadData();

                return response;
            } catch (Throwable e) {
                transactionManager.returnTransactionThreadData();
                throw e;
            }
        }
    }

超时

超时使用的是reactor-coreMono,详细见Reactor 3 Reference Guide

        //超时设置
        Mono.delay(Duration.ofMillis(maxWaitTime))
                .map(t -> onTransactionTimeout(transactionId))
                .publishOn(Schedulers.parallel())
                .subscribe();

用的不是timeout而是delay,所以onTransactionTimeout一定会执行,检测事务的结果。

阶段2结果通告

用的时Redis的订阅发布功能。

测试

代码见 https://github.com/giafei/cloud-transaction

上一篇 下一篇

猜你喜欢

热点阅读