分布式分布式事务Spring Cloud 相关文章

tcc-transaction源码分析与思考

2018-04-27  本文已影响2365人  土豆肉丝盖浇饭

tcc介绍

tcc是分布式事务的一种解决方案,即Try,Commit,Cancel

Try: 尝试执行业务

完成所有业务检查(一致性)

预留必须业务资源(准隔离性)

Confirm: 确认执行业务

真正执行业务

不作任何业务检查

只使用Try阶段预留的业务资源

Confirm操作满足幂等性

Cancel: 取消执行业务

释放Try阶段预留的业务资源

Cancel操作满足幂等性

本文我会讲解一个实现tcc思想的框架,tcc-transaction


image.png

在github还是比较火的,并且应该也有公司生产使用,熟悉它的源码,一方面可以提升自己眼界,扩宽自己编码能力,另一方面,以后需要使用它的时候,难免有坑需要修改,也能为开源贡献一份力量

tcc-transaction使用

在tcc-transaction的tcc-transaction-tutorial-sample模块,做相关配置运行即可

tcc-transaction原理

这边我们主要讲解tcc-transaction和dubbo的整合
tcc-transaction的主要原理是Aop,那么以后面试的时候,问用Aop做什么,就可以回答这个了,再把tcc讲一下,完美的连招
作为一个处理分布式事务的框架,先来讲下tcc-transaction的事务抽象

Transaction

tcc的事务,并不是数据库的事务,而是应用层的事务,所以tcc这三个阶段的操作,全部都需要我们手动实现
先看下Transaction对象的参数

    private TransactionXid xid;

    private TransactionStatus status;

    private TransactionType transactionType;

    private volatile int retriedCount = 0;

    private Date createTime = new Date();

    private Date lastUpdateTime = new Date();

    private long version = 1;

    private List<Participant> participants = new ArrayList<Participant>();

    private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
参数 解释
xid 全局事务id,内容相当于uuid,用来保证事务唯一性
status 事务的状态,可以为TRYING,CONFIRMING,CANCELLING,分别对应tcc三个阶段
transactionType 事务类型,可以为ROOT,BRANCH,ROOT是主事务,BRANCH是分支事务,事务的发起方法对应主事务,其他被调用的dubbo接口在分支事务上
retriedCount 事务重试次数,当confirm或者cancel失败重试时增加
createTime 事务的创建时间
lastUpdateTime 事务最后一次更新时间
version 事务的版本号,乐观锁?
participants 事务的参与者
attachments 附加参数,暂时没发现被用到

再看下Transaction中两个比较重要的方法

public void commit() {

    for (Participant participant : participants) {
        participant.commit();
    }
}

public void rollback() {
    for (Participant participant : participants) {
        participant.rollback();
    }
}

执行Transaction的commit或rollback方法,会对应执行所有participant的commit或rollback方法

下面讲解Participant抽象

Participant

Participant是对tcc事务参与者的抽象

public class Participant implements Serializable {

    private static final long serialVersionUID = 4127729421281425247L;

    private TransactionXid xid;

    private InvocationContext confirmInvocationContext;

    private InvocationContext cancelInvocationContext;

    private Terminator terminator = new Terminator();

    Class<? extends TransactionContextEditor> transactionContextEditorClass;

    ......
}

在Participant中使用InvocationContext把事务参与者的confirm和cancel方法的元数据保存下来

public class InvocationContext implements Serializable {

    private static final long serialVersionUID = -7969140711432461165L;
    private Class targetClass;

    private String methodName;

    private Class[] parameterTypes;

    private Object[] args;
}

元数据包括目标Class,目标方法名,目标参数列表,实际参数列表

Participant也保存了transactionContextEditorClass,用于提取事务上下文

接下来看Participant的commit和rollback方法

 public void rollback() {
        terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
    }

    public void commit() {
        terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
    }

会通过terminator执行具体的操作,传入构造好的TransactionContext,confirmInvocationContext和transactionContextEditorClass

public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {


        if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {

            try {
                //从spring容器中获取对应事务参与者实现
                Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();

                Method method = null;
                //获取对应方法反射对象
                method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
                //设置上下文
                FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
                //反射调用
                return method.invoke(target, invocationContext.getArgs());

            } catch (Exception e) {
                throw new SystemException(e);
            }
        }
        return null;
    }

在terminator的调用过程中,我们需要理解的一点的是,Participant其实分两种,第一种是本地的Participant,直接反射调用即可,第二种是远程的Participant,也就是调用的是Dubbo接口,反射调用的同时会执行远程对等端的接口逻辑,所以这里需要用到transactionContextEditorClass来设置上下文信息,传递到远程dubbo接口中去

TransactionContext

image.png

TransactionContext的保存了全局事务id和事务状态,在调用事务参与者Participant的confirm或cancel方法时会传递过去

TransactionContextEditor

public interface TransactionContextEditor {

    public TransactionContext get(Object target, Method method, Object[] args);

    public void set(TransactionContext transactionContext, Object target, Method method, Object[] args);

}

TransactionContextEditor用于调用事务参与者方法时,设置和获取需要传递的TransactionContext,目前有2种实现,DefaultTransactionContextEditor和DubboTransactionContextEditor
DefaultTransactionContextEditor从方法参数里面提取TransactionContext对象

@Override
        public TransactionContext get(Object target, Method method, Object[] args) {
            //获取TransactionContext在args中的位置
            int position = getTransactionContextParamPosition(method.getParameterTypes());

            if (position >= 0) {
                return (TransactionContext) args[position];
            }

            return null;
        }

DubboTransactionContextEditor从Dubbo Rpc上下文提取TransactionContext对象

@Override
    public TransactionContext get(Object target, Method method, Object[] args) {
        //从Dubbo Rpc上下文获取
        String context = RpcContext.getContext().getAttachment(TransactionContextConstants.TRANSACTION_CONTEXT);

        if (StringUtils.isNotEmpty(context)) {
            return JSON.parseObject(context, TransactionContext.class);
        }

        return null;
    }

TransactionManager

TransactionManager用来控制Transaction的生命周期,Transaction的改变使用TransactionRepository同步到第三方存储,一般使用mysql数据库存储
TransactionManager包含的方法以及属性如下


image.png

变量介绍

  1. transactionRepository
 private TransactionRepository transactionRepository;

TransactionRepository用于对Transaction的持久化操作,如果是JDBC实现,其实就是对一张Transaction表的CRUD,这张表主要用于补偿任务

  1. CURRENT
private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();

这是一个双向队列,在这个类主要用作栈,用来处理事务的嵌套,因为是ThreadLocal,所以针对每个线程都是独立的

  1. executorService
 private ExecutorService executorService;

线程池,用于异步执行commit或者cancel

方法介绍

  1. registerTransaction
private void registerTransaction(Transaction transaction) {

    if (CURRENT.get() == null) {
        CURRENT.set(new LinkedList<Transaction>());
    }

    CURRENT.get().push(transaction);
}

把transaction设置到ThreadLocal对象中去,push方法对应入栈

  1. begin
    public Transaction begin() {

    Transaction transaction = new Transaction(TransactionType.ROOT);
    transactionRepository.create(transaction);
    registerTransaction(transaction);
    return transaction;
    }
    开启事务,同步到repository,注册到ThreadLocal,这个方法与用于主事务的创建

  2. propagationNewBegin

public Transaction propagationNewBegin(TransactionContext transactionContext) {

    Transaction transaction = new Transaction(transactionContext);
    transactionRepository.create(transaction);

    registerTransaction(transaction);
    return transaction;
}

这个方法用于从主事务的上下文创建分支事务,xid保持不变,事务类型变化

  1. propagationExistBegin
public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
    Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());

    if (transaction != null) {
        transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));
        registerTransaction(transaction);
        return transaction;
    } else {
        throw new NoExistedTransactionException();
    }
}

这个方法用于从事务上下文同步事务状态到ThreadLocal

  1. commit
public void commit(boolean asyncCommit) {
    //从threadlocal得到当前事务
    final Transaction transaction = getCurrentTransaction();

    transaction.changeStatus(TransactionStatus.CONFIRMING);
    //数据库更新transaction
    transactionRepository.update(transaction);

    if (asyncCommit) {
        try {
            Long statTime = System.currentTimeMillis();
            //通过线程池异步执行事务提交
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    commitTransaction(transaction);
                }
            });
            logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
        } catch (Throwable commitException) {
            logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);
            throw new ConfirmingException(commitException);
        }
    } else {
        //同步执行事务提交
        commitTransaction(transaction);
    }
}

这个方法执行事务的commit,实际事务提交在commitTransaction中执行,会执行每个事务参与者的commit方法

private void commitTransaction(Transaction transaction) {
    try {
        //调用事务参与者的提交方法
        transaction.commit();
        //事务结束,在数据库删除当前事务,如果commit异常,不会把数据库内事务记录删除,为了重试补偿
        transactionRepository.delete(transaction);
    } catch (Throwable commitException) {
        logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
        throw new ConfirmingException(commitException);
    }
}

看了这段逻辑后,我们可以发现,在commit失败的时候,并不会触发rollback,而是不删除数据库事务记录,之后会有定时任务进行扫描重试,后面会讲到这个定时任务

  1. rollback
public void rollback(boolean asyncRollback) {

    final Transaction transaction = getCurrentTransaction();
    transaction.changeStatus(TransactionStatus.CANCELLING);
    
    transactionRepository.update(transaction);

    if (asyncRollback) {

        try {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    rollbackTransaction(transaction);
                }
            });
        } catch (Throwable rollbackException) {
            logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);
            throw new CancellingException(rollbackException);
        }
    } else {

        rollbackTransaction(transaction);
    }
}

这个方法用于执行事务的回滚逻辑,和commit方法类似,在rollbackTransaction方法中,会执行每个事务参与者的rollback方法

private void rollbackTransaction(Transaction transaction) {
    try {
        //调用事务参与者的提交方法
        transaction.rollback();
         //事务结束,在数据库删除当前事务,如果rollback异常,不会把数据库内事务记录删除,为了重试补偿
        transactionRepository.delete(transaction);
    } catch (Throwable rollbackException) {
        logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);
        throw new CancellingException(rollbackException);
    }
}
  1. cleanAfterCompletion
public void cleanAfterCompletion(Transaction transaction) {
    if (isTransactionActive() && transaction != null) {
        Transaction currentTransaction = getCurrentTransaction();
        if (currentTransaction == transaction) {
            //栈操作,后进先出
            CURRENT.get().pop();
        } else {
            throw new SystemException("Illegal transaction when clean after completion");
        }
    }
}

事务结束,从栈中弹出结束的事务。

  1. enlistParticipant
public void enlistParticipant(Participant participant) {
        Transaction transaction = this.getCurrentTransaction();
        transaction.enlistParticipant(participant);
        transactionRepository.update(transaction);
    }

给事务绑定事务参与者并同步到repository

接下来讲下核心的两个切面,这两个切面把上面的所有组件串联在一起使用

核心Aspect

在使用tcc-transaction的时候,我们需要对开启tcc事务的方法加上@Compensable注解,这个注解可以设置以下参数

参数 解释
propagation 事务传播性,包含REQUIRED(必须存在事务,不存在,创建),SUPPORTS(有事务的话在事务内运行),MANDATORY(必须存在事务),REQUIRES_NEW(不管是否存在,创建新的事务)
confirmMethod confirm阶段对应的方法
cancelMethod cancel阶段对应的方法
transactionContextEditor 设置对应transactionContextEditor
asyncConfirm 是否异步confirm
asyncCancel 是否异步cancel

@Compensable注解的参数会在下面两个切面中使用到

ConfigurableTransactionAspect

ConfigurableTransactionAspect主要用来控制Transaction的生命周期,内部通过CompensableTransactionInterceptor实现

@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableService() {

}

@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

    return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}

直接看interceptCompensableMethod方法

public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

    //解析@Compensable注解
    Method method = CompensableMethodUtils.getCompensableMethod(pjp);
    Compensable compensable = method.getAnnotation(Compensable.class);
    Propagation propagation = compensable.propagation();
    //获取上下文,如果是Root,不会存在上下文,Transaction都还没创建
    TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
    boolean asyncConfirm = compensable.asyncConfirm();
    boolean asyncCancel = compensable.asyncCancel();
    boolean isTransactionActive = transactionManager.isTransactionActive();

    if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
        throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
    }

    //计算方法类型,Root对应主事务入口方法,Provider对应远程提供者方的方法,Normal是主事务内消费者方的方法(是代理方法)
    MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);

    switch (methodType) {
        case ROOT:
            //处理主事务切面
            return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
        case PROVIDER:
            //处理提供者事务切面
            return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
        default:
            //消费者事务直接执行,会对应执行远端提供者事务切面
            return pjp.proceed();
    }
}

在tcc事务内被@Compensable注解的方法分三种

  1. Root方法,就是这次事务的入口方法
  2. Normal方法,在Root方法调用的dubbo接口方法
  3. Provider方法,对应dubbo接口方法的远程实现
    被注解的方法都是try的逻辑,confirm和cancel逻辑配置在@Compensable注解参数中

对被@Compensable注解的方法执行切面逻辑的时候,会根据这三种方法类型做不同处理
对于Root方法,执行rootMethodProceed的逻辑

private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {

    Object returnValue = null;

    Transaction transaction = null;

    try {

        //创建事务
        transaction = transactionManager.begin();

        try {
            returnValue = pjp.proceed();
        } catch (Throwable tryingException) {

            if (isDelayCancelException(tryingException)) {
                transactionManager.syncTransaction();
            } else {
                logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);

                //回滚事务
                transactionManager.rollback(asyncCancel);
            }

            throw tryingException;
        }

        //提交事务
        transactionManager.commit(asyncConfirm);

    } finally {
        //清理操作
        transactionManager.cleanAfterCompletion(transaction);
    }

    return returnValue;
}

会在被切方法(对应try逻辑)执行前,开启事务,try逻辑执行成功,通过transactionManager的commit方法执行每个事务参与者的commit逻辑,如果try失败,通过transactionManager执行每个参与者的rollback逻辑。

对于Provider方法

private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {

    Transaction transaction = null;
    try {

        switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
            case TRYING:
                //使用transactionContext创建分支事务
                transaction = transactionManager.propagationNewBegin(transactionContext);
                //执行被切方法逻辑
                return pjp.proceed();
            case CONFIRMING:
                try {
                    //更新事务状态
                    transaction = transactionManager.propagationExistBegin(transactionContext);
                    //提交事务,不走切面的方法
                    transactionManager.commit(asyncConfirm);
                } catch (NoExistedTransactionException excepton) {
                    //the transaction has been commit,ignore it.
                }
                break;
            case CANCELLING:

                try {
                    //更新事务状态
                    transaction = transactionManager.propagationExistBegin(transactionContext);
                    //回滚事务,不走切面的方法
                    transactionManager.rollback(asyncCancel);
                } catch (NoExistedTransactionException exception) {
                    //the transaction has been rollback,ignore it.
                }
                break;
        }

    } finally {
        //清理资源
        transactionManager.cleanAfterCompletion(transaction);
    }

    Method method = ((MethodSignature) (pjp.getSignature())).getMethod();

    //对于 confirm和 cancel 返回空值
    //主要针对原始类型做处理,因为不能为null
    return ReflectionUtils.getNullValue(method.getReturnType());
}

可以看到在provider类型方法的切面,也就是远程的Participant,如果transaction的status为trying,会通过transactionManager.propagationNewBegin创建分支事务并执行被切方法逻辑,如果是status为confirming或者canceling,会调用对应confirm或cancel配置的方法,跳过被切方法

对于normal类型
直接调用,normal类型的方法是封装了对远程dubbo接口方法调用逻辑的本地代理方法,所以直接执行即可

ConfigurableCoordinatorAspect

ConfigurableCoordinatorAspect主要是为了设置事务的参与者,在一个事务内,每个被@Compensable注解的方法都是事务参与者

@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void transactionContextCall() {

}

@Around("transactionContextCall()")
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
    return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
}

相关逻辑封装在ResourceCoordinatorInterceptor的interceptTransactionContextMethod方法中

public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {

    //得到当前事务
    Transaction transaction = transactionManager.getCurrentTransaction();

    if (transaction != null) {

        switch (transaction.getStatus()) {
            case TRYING:
                //只需要在trying的时候把参与者信息提取出来,设置到transaction中去
                enlistParticipant(pjp);
                break;
            case CONFIRMING:
                break;
            case CANCELLING:
                break;
        }
    }

    //执行目标方法
    return pjp.proceed(pjp.getArgs());
}

在trying阶段会把所有参与者加入到事务中去,对于Root方法,创建主事务,加入的参与者会包括Root方法对应本地参与者以及Normal方法对应的远程参与者,对于Provider方法,通过主事务上下文创建分支事务,加入的参与者包括Provider方法对应的本地参与者以及它包含的Normal方法对应的远程参与者。这里的远程参与者又可以开启新的分支事务。层级多了,势必会产生性能问题。

接下来看enlistParticipant如何生成参与者对象

private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {

    //获取@Compensable信息
    Method method = CompensableMethodUtils.getCompensableMethod(pjp);
    if (method == null) {
        throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
    }
    Compensable compensable = method.getAnnotation(Compensable.class);

    String confirmMethodName = compensable.confirmMethod();
    String cancelMethodName = compensable.cancelMethod();

    Transaction transaction = transactionManager.getCurrentTransaction();
    TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());

    if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
        //设置事务上下文到Editor,Editor用来统一提取上下文,这边对应设置dubbo的rpc上下文中去
        //这边的上下文设置后就会调用try逻辑
        FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
    }

    Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());

    //目前的用法,其实只要保存调用参数就行,因为具体执行confirm和cancel都是根据transaction的status来判断的
    //confirm的调用上下文
    InvocationContext confirmInvocation = new InvocationContext(targetClass,
            confirmMethodName,
            method.getParameterTypes(), pjp.getArgs());

    //cancel的调用上下文
    InvocationContext cancelInvocation = new InvocationContext(targetClass,
            cancelMethodName,
            method.getParameterTypes(), pjp.getArgs());

    Participant participant =
            new Participant(
                    xid,
                    confirmInvocation,
                    cancelInvocation,
                    compensable.transactionContextEditor());

    //把participant设置到transaction,并且同步到持久化存储
    transactionManager.enlistParticipant(participant);

}

通过从@Compensable注解配置的信息以及当前Transaction来配置Participant。
在Participant设置到Transaction后,会执行pjp.proceed(pjp.getArgs()),也就执行了对应try逻辑的被切方法

ConfigurableCoordinatorAspect的逻辑会在ConfigurableTransactionAspect后执行,这和它们设置的order有关,小的order先执行,后切入

失败补偿机制

对于失败的Confirm和Cancel操作,会有补偿任务进行重试,具体实现类为RecoverScheduledJob,在这个类的init方法会启动quartz任务

public void init() {

    try {
        MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
        //设置定时任务执行的对象和方法
        jobDetail.setTargetObject(transactionRecovery);
        jobDetail.setTargetMethod("startRecover");
        jobDetail.setName("transactionRecoveryJob");
        jobDetail.setConcurrent(false);
        jobDetail.afterPropertiesSet();

        CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
        cronTrigger.setBeanName("transactionRecoveryCronTrigger");
        //设置cron表达式
        cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
        cronTrigger.setJobDetail(jobDetail.getObject());
        cronTrigger.afterPropertiesSet();

        scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
        scheduler.start();

    } catch (Exception e) {
        throw new SystemException(e);
    }
}

在这个方法里会使用RecoverConfig的配置初始化定时任务,定时任务具体的执行逻辑使用MethodInvokingJobDetailFactoryBean的targetObject和targetMethod配置,对应为transactionRecovery的startRecover方法,我们来看下这个方法

public void startRecover() {
        //获取所有没被处理的transaction
        List<Transaction> transactions = loadErrorTransactions();
        //根据规则处理这些transaction
        recoverErrorTransactions(transactions);
    }

分别看下上述两个方法的逻辑

private List<Transaction> loadErrorTransactions() {


        long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();

        TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
        RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();

        //获取在RecoverDuration间隔之前未完成的transaction
        return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));
    }
 private void recoverErrorTransactions(List<Transaction> transactions) {


    for (Transaction transaction : transactions) {

        //重试次数超过上限的Transaction不再执行补偿
        if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {

            logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));
            continue;
        }

        //如果是分支事务,并且超过最长超时时间忽略
        if (transaction.getTransactionType().equals(TransactionType.BRANCH)
                && (transaction.getCreateTime().getTime() +
                transactionConfigurator.getRecoverConfig().getMaxRetryCount() *
                        transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000
                > System.currentTimeMillis())) {
            continue;
        }
        
        try {
            transaction.addRetriedCount();

            //对超时的confiring操作重试
            if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {

                transaction.changeStatus(TransactionStatus.CONFIRMING);
                transactionConfigurator.getTransactionRepository().update(transaction);
                transaction.commit();
                transactionConfigurator.getTransactionRepository().delete(transaction);

            } else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)//对超时的Canceling操作重试,或者Root超时的trying进行cancel操作
                    || transaction.getTransactionType().equals(TransactionType.ROOT)) {

                transaction.changeStatus(TransactionStatus.CANCELLING);
                transactionConfigurator.getTransactionRepository().update(transaction);
                transaction.rollback();
                transactionConfigurator.getTransactionRepository().delete(transaction);
            }

        } catch (Throwable throwable) {

            if (throwable instanceof OptimisticLockException
                    || ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {
                logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
            } else {
                logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
            }
        }
    }
}

注意一点,trying阶段不会重试,失败未处理,会触发canceling操作

思考

分布式事务解决方案

下面列举一些分布式事务解决方案的特性

  1. 传统的二/三阶段提交
    这种解决方案会占用数据库事务资源,在互联网公司很少使用
  2. 异步确保型事务



    基于可靠消息的最终一致性,可以异步,但数据绝对不能丢,而且一定要记账成功
    这个难道是依赖mq支持的事务特性?

  3. 最大努力通知型事务



    按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对
    目前项目比较常用的方式

  4. tcc



    适用于实时性要求比较高,数据必须可靠的场景

我的理解

依照我目前的工作经验,现在公司对分布式事务的解决方案一般是上述的第三种方法,但是一些对实时性要求比较高,数据必须可靠的场景我们还是可以考虑使用tcc的,但是也没必要全盘tcc,可以和最大努力通知型事务一起使用

对于tcc还有一个疑问,在高并发情况下,在mq的模式下,由于是异步,能够保证消息最终被消费掉,并且消费速率稳定,而tcc这种模式,会不会导致接口资源不够用,接口资源都占用满,导致不断的try失败。

由此可见tcc的使用难度不止在业务使用方式上,对于一些极限的场景,需要有经验的人来分析tcc该使用在多大范围内。但是如果是并发量不大的项目,大家可以试着使用。

朋友们,使用或没使用过tcc的,请留下你的想法。

最后

希望大家关注下我的公众号


image

资源链接

tcc-transaction git地址

上一篇下一篇

猜你喜欢

热点阅读