tcc-transaction源码分析与思考
tcc介绍
tcc是分布式事务的一种解决方案,即Try,Commit,Cancel
Try: 尝试执行业务
完成所有业务检查(一致性)
预留必须业务资源(准隔离性)
Confirm: 确认执行业务
真正执行业务
不作任何业务检查
只使用Try阶段预留的业务资源
Confirm操作满足幂等性
Cancel: 取消执行业务
释放Try阶段预留的业务资源
Cancel操作满足幂等性
本文我会讲解一个实现tcc思想的框架,tcc-transaction
image.png
在github还是比较火的,并且应该也有公司生产使用,熟悉它的源码,一方面可以提升自己眼界,扩宽自己编码能力,另一方面,以后需要使用它的时候,难免有坑需要修改,也能为开源贡献一份力量
tcc-transaction使用
在tcc-transaction的tcc-transaction-tutorial-sample模块,做相关配置运行即可
tcc-transaction原理
这边我们主要讲解tcc-transaction和dubbo的整合
tcc-transaction的主要原理是Aop,那么以后面试的时候,问用Aop做什么,就可以回答这个了,再把tcc讲一下,完美的连招
作为一个处理分布式事务的框架,先来讲下tcc-transaction的事务抽象
Transaction
tcc的事务,并不是数据库的事务,而是应用层的事务,所以tcc这三个阶段的操作,全部都需要我们手动实现
先看下Transaction对象的参数
private TransactionXid xid;
private TransactionStatus status;
private TransactionType transactionType;
private volatile int retriedCount = 0;
private Date createTime = new Date();
private Date lastUpdateTime = new Date();
private long version = 1;
private List<Participant> participants = new ArrayList<Participant>();
private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
参数 | 解释 |
---|---|
xid | 全局事务id,内容相当于uuid,用来保证事务唯一性 |
status | 事务的状态,可以为TRYING,CONFIRMING,CANCELLING,分别对应tcc三个阶段 |
transactionType | 事务类型,可以为ROOT,BRANCH,ROOT是主事务,BRANCH是分支事务,事务的发起方法对应主事务,其他被调用的dubbo接口在分支事务上 |
retriedCount | 事务重试次数,当confirm或者cancel失败重试时增加 |
createTime | 事务的创建时间 |
lastUpdateTime | 事务最后一次更新时间 |
version | 事务的版本号,乐观锁? |
participants | 事务的参与者 |
attachments | 附加参数,暂时没发现被用到 |
再看下Transaction中两个比较重要的方法
public void commit() {
for (Participant participant : participants) {
participant.commit();
}
}
public void rollback() {
for (Participant participant : participants) {
participant.rollback();
}
}
执行Transaction的commit或rollback方法,会对应执行所有participant的commit或rollback方法
下面讲解Participant抽象
Participant
Participant是对tcc事务参与者的抽象
public class Participant implements Serializable {
private static final long serialVersionUID = 4127729421281425247L;
private TransactionXid xid;
private InvocationContext confirmInvocationContext;
private InvocationContext cancelInvocationContext;
private Terminator terminator = new Terminator();
Class<? extends TransactionContextEditor> transactionContextEditorClass;
......
}
在Participant中使用InvocationContext把事务参与者的confirm和cancel方法的元数据保存下来
public class InvocationContext implements Serializable {
private static final long serialVersionUID = -7969140711432461165L;
private Class targetClass;
private String methodName;
private Class[] parameterTypes;
private Object[] args;
}
元数据包括目标Class,目标方法名,目标参数列表,实际参数列表
Participant也保存了transactionContextEditorClass,用于提取事务上下文
接下来看Participant的commit和rollback方法
public void rollback() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
}
public void commit() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}
会通过terminator执行具体的操作,传入构造好的TransactionContext,confirmInvocationContext和transactionContextEditorClass
public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
try {
//从spring容器中获取对应事务参与者实现
Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();
Method method = null;
//获取对应方法反射对象
method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
//设置上下文
FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
//反射调用
return method.invoke(target, invocationContext.getArgs());
} catch (Exception e) {
throw new SystemException(e);
}
}
return null;
}
在terminator的调用过程中,我们需要理解的一点的是,Participant其实分两种,第一种是本地的Participant,直接反射调用即可,第二种是远程的Participant,也就是调用的是Dubbo接口,反射调用的同时会执行远程对等端的接口逻辑,所以这里需要用到transactionContextEditorClass来设置上下文信息,传递到远程dubbo接口中去
TransactionContext
image.pngTransactionContext的保存了全局事务id和事务状态,在调用事务参与者Participant的confirm或cancel方法时会传递过去
TransactionContextEditor
public interface TransactionContextEditor {
public TransactionContext get(Object target, Method method, Object[] args);
public void set(TransactionContext transactionContext, Object target, Method method, Object[] args);
}
TransactionContextEditor用于调用事务参与者方法时,设置和获取需要传递的TransactionContext,目前有2种实现,DefaultTransactionContextEditor和DubboTransactionContextEditor
DefaultTransactionContextEditor从方法参数里面提取TransactionContext对象
@Override
public TransactionContext get(Object target, Method method, Object[] args) {
//获取TransactionContext在args中的位置
int position = getTransactionContextParamPosition(method.getParameterTypes());
if (position >= 0) {
return (TransactionContext) args[position];
}
return null;
}
DubboTransactionContextEditor从Dubbo Rpc上下文提取TransactionContext对象
@Override
public TransactionContext get(Object target, Method method, Object[] args) {
//从Dubbo Rpc上下文获取
String context = RpcContext.getContext().getAttachment(TransactionContextConstants.TRANSACTION_CONTEXT);
if (StringUtils.isNotEmpty(context)) {
return JSON.parseObject(context, TransactionContext.class);
}
return null;
}
TransactionManager
TransactionManager用来控制Transaction的生命周期,Transaction的改变使用TransactionRepository同步到第三方存储,一般使用mysql数据库存储
TransactionManager包含的方法以及属性如下
image.png
变量介绍
- transactionRepository
private TransactionRepository transactionRepository;
TransactionRepository用于对Transaction的持久化操作,如果是JDBC实现,其实就是对一张Transaction表的CRUD,这张表主要用于补偿任务
- CURRENT
private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();
这是一个双向队列,在这个类主要用作栈,用来处理事务的嵌套,因为是ThreadLocal,所以针对每个线程都是独立的
- executorService
private ExecutorService executorService;
线程池,用于异步执行commit或者cancel
方法介绍
- registerTransaction
private void registerTransaction(Transaction transaction) {
if (CURRENT.get() == null) {
CURRENT.set(new LinkedList<Transaction>());
}
CURRENT.get().push(transaction);
}
把transaction设置到ThreadLocal对象中去,push方法对应入栈
-
begin
public Transaction begin() {Transaction transaction = new Transaction(TransactionType.ROOT);
transactionRepository.create(transaction);
registerTransaction(transaction);
return transaction;
}
开启事务,同步到repository,注册到ThreadLocal,这个方法与用于主事务的创建 -
propagationNewBegin
public Transaction propagationNewBegin(TransactionContext transactionContext) {
Transaction transaction = new Transaction(transactionContext);
transactionRepository.create(transaction);
registerTransaction(transaction);
return transaction;
}
这个方法用于从主事务的上下文创建分支事务,xid保持不变,事务类型变化
- propagationExistBegin
public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());
if (transaction != null) {
transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));
registerTransaction(transaction);
return transaction;
} else {
throw new NoExistedTransactionException();
}
}
这个方法用于从事务上下文同步事务状态到ThreadLocal
- commit
public void commit(boolean asyncCommit) {
//从threadlocal得到当前事务
final Transaction transaction = getCurrentTransaction();
transaction.changeStatus(TransactionStatus.CONFIRMING);
//数据库更新transaction
transactionRepository.update(transaction);
if (asyncCommit) {
try {
Long statTime = System.currentTimeMillis();
//通过线程池异步执行事务提交
executorService.submit(new Runnable() {
@Override
public void run() {
commitTransaction(transaction);
}
});
logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
} catch (Throwable commitException) {
logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);
throw new ConfirmingException(commitException);
}
} else {
//同步执行事务提交
commitTransaction(transaction);
}
}
这个方法执行事务的commit,实际事务提交在commitTransaction中执行,会执行每个事务参与者的commit方法
private void commitTransaction(Transaction transaction) {
try {
//调用事务参与者的提交方法
transaction.commit();
//事务结束,在数据库删除当前事务,如果commit异常,不会把数据库内事务记录删除,为了重试补偿
transactionRepository.delete(transaction);
} catch (Throwable commitException) {
logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
throw new ConfirmingException(commitException);
}
}
看了这段逻辑后,我们可以发现,在commit失败的时候,并不会触发rollback,而是不删除数据库事务记录,之后会有定时任务进行扫描重试,后面会讲到这个定时任务
- rollback
public void rollback(boolean asyncRollback) {
final Transaction transaction = getCurrentTransaction();
transaction.changeStatus(TransactionStatus.CANCELLING);
transactionRepository.update(transaction);
if (asyncRollback) {
try {
executorService.submit(new Runnable() {
@Override
public void run() {
rollbackTransaction(transaction);
}
});
} catch (Throwable rollbackException) {
logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);
throw new CancellingException(rollbackException);
}
} else {
rollbackTransaction(transaction);
}
}
这个方法用于执行事务的回滚逻辑,和commit方法类似,在rollbackTransaction方法中,会执行每个事务参与者的rollback方法
private void rollbackTransaction(Transaction transaction) {
try {
//调用事务参与者的提交方法
transaction.rollback();
//事务结束,在数据库删除当前事务,如果rollback异常,不会把数据库内事务记录删除,为了重试补偿
transactionRepository.delete(transaction);
} catch (Throwable rollbackException) {
logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);
throw new CancellingException(rollbackException);
}
}
- cleanAfterCompletion
public void cleanAfterCompletion(Transaction transaction) {
if (isTransactionActive() && transaction != null) {
Transaction currentTransaction = getCurrentTransaction();
if (currentTransaction == transaction) {
//栈操作,后进先出
CURRENT.get().pop();
} else {
throw new SystemException("Illegal transaction when clean after completion");
}
}
}
事务结束,从栈中弹出结束的事务。
- enlistParticipant
public void enlistParticipant(Participant participant) {
Transaction transaction = this.getCurrentTransaction();
transaction.enlistParticipant(participant);
transactionRepository.update(transaction);
}
给事务绑定事务参与者并同步到repository
接下来讲下核心的两个切面,这两个切面把上面的所有组件串联在一起使用
核心Aspect
在使用tcc-transaction的时候,我们需要对开启tcc事务的方法加上@Compensable注解,这个注解可以设置以下参数
参数 | 解释 |
---|---|
propagation | 事务传播性,包含REQUIRED(必须存在事务,不存在,创建),SUPPORTS(有事务的话在事务内运行),MANDATORY(必须存在事务),REQUIRES_NEW(不管是否存在,创建新的事务) |
confirmMethod | confirm阶段对应的方法 |
cancelMethod | cancel阶段对应的方法 |
transactionContextEditor | 设置对应transactionContextEditor |
asyncConfirm | 是否异步confirm |
asyncCancel | 是否异步cancel |
@Compensable注解的参数会在下面两个切面中使用到
ConfigurableTransactionAspect
ConfigurableTransactionAspect主要用来控制Transaction的生命周期,内部通过CompensableTransactionInterceptor实现
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableService() {
}
@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}
直接看interceptCompensableMethod方法
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
//解析@Compensable注解
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
Compensable compensable = method.getAnnotation(Compensable.class);
Propagation propagation = compensable.propagation();
//获取上下文,如果是Root,不会存在上下文,Transaction都还没创建
TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
boolean asyncConfirm = compensable.asyncConfirm();
boolean asyncCancel = compensable.asyncCancel();
boolean isTransactionActive = transactionManager.isTransactionActive();
if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
}
//计算方法类型,Root对应主事务入口方法,Provider对应远程提供者方的方法,Normal是主事务内消费者方的方法(是代理方法)
MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
switch (methodType) {
case ROOT:
//处理主事务切面
return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
case PROVIDER:
//处理提供者事务切面
return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
default:
//消费者事务直接执行,会对应执行远端提供者事务切面
return pjp.proceed();
}
}
在tcc事务内被@Compensable注解的方法分三种
- Root方法,就是这次事务的入口方法
- Normal方法,在Root方法调用的dubbo接口方法
- Provider方法,对应dubbo接口方法的远程实现
被注解的方法都是try的逻辑,confirm和cancel逻辑配置在@Compensable注解参数中
对被@Compensable注解的方法执行切面逻辑的时候,会根据这三种方法类型做不同处理
对于Root方法,执行rootMethodProceed的逻辑
private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
Object returnValue = null;
Transaction transaction = null;
try {
//创建事务
transaction = transactionManager.begin();
try {
returnValue = pjp.proceed();
} catch (Throwable tryingException) {
if (isDelayCancelException(tryingException)) {
transactionManager.syncTransaction();
} else {
logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
//回滚事务
transactionManager.rollback(asyncCancel);
}
throw tryingException;
}
//提交事务
transactionManager.commit(asyncConfirm);
} finally {
//清理操作
transactionManager.cleanAfterCompletion(transaction);
}
return returnValue;
}
会在被切方法(对应try逻辑)执行前,开启事务,try逻辑执行成功,通过transactionManager的commit方法执行每个事务参与者的commit逻辑,如果try失败,通过transactionManager执行每个参与者的rollback逻辑。
对于Provider方法
private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
Transaction transaction = null;
try {
switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
case TRYING:
//使用transactionContext创建分支事务
transaction = transactionManager.propagationNewBegin(transactionContext);
//执行被切方法逻辑
return pjp.proceed();
case CONFIRMING:
try {
//更新事务状态
transaction = transactionManager.propagationExistBegin(transactionContext);
//提交事务,不走切面的方法
transactionManager.commit(asyncConfirm);
} catch (NoExistedTransactionException excepton) {
//the transaction has been commit,ignore it.
}
break;
case CANCELLING:
try {
//更新事务状态
transaction = transactionManager.propagationExistBegin(transactionContext);
//回滚事务,不走切面的方法
transactionManager.rollback(asyncCancel);
} catch (NoExistedTransactionException exception) {
//the transaction has been rollback,ignore it.
}
break;
}
} finally {
//清理资源
transactionManager.cleanAfterCompletion(transaction);
}
Method method = ((MethodSignature) (pjp.getSignature())).getMethod();
//对于 confirm和 cancel 返回空值
//主要针对原始类型做处理,因为不能为null
return ReflectionUtils.getNullValue(method.getReturnType());
}
可以看到在provider类型方法的切面,也就是远程的Participant,如果transaction的status为trying,会通过transactionManager.propagationNewBegin创建分支事务并执行被切方法逻辑,如果是status为confirming或者canceling,会调用对应confirm或cancel配置的方法,跳过被切方法
对于normal类型
直接调用,normal类型的方法是封装了对远程dubbo接口方法调用逻辑的本地代理方法,所以直接执行即可
ConfigurableCoordinatorAspect
ConfigurableCoordinatorAspect主要是为了设置事务的参与者,在一个事务内,每个被@Compensable注解的方法都是事务参与者
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void transactionContextCall() {
}
@Around("transactionContextCall()")
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
}
相关逻辑封装在ResourceCoordinatorInterceptor的interceptTransactionContextMethod方法中
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
//得到当前事务
Transaction transaction = transactionManager.getCurrentTransaction();
if (transaction != null) {
switch (transaction.getStatus()) {
case TRYING:
//只需要在trying的时候把参与者信息提取出来,设置到transaction中去
enlistParticipant(pjp);
break;
case CONFIRMING:
break;
case CANCELLING:
break;
}
}
//执行目标方法
return pjp.proceed(pjp.getArgs());
}
在trying阶段会把所有参与者加入到事务中去,对于Root方法,创建主事务,加入的参与者会包括Root方法对应本地参与者以及Normal方法对应的远程参与者,对于Provider方法,通过主事务上下文创建分支事务,加入的参与者包括Provider方法对应的本地参与者以及它包含的Normal方法对应的远程参与者。这里的远程参与者又可以开启新的分支事务。层级多了,势必会产生性能问题。
接下来看enlistParticipant如何生成参与者对象
private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {
//获取@Compensable信息
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
if (method == null) {
throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
}
Compensable compensable = method.getAnnotation(Compensable.class);
String confirmMethodName = compensable.confirmMethod();
String cancelMethodName = compensable.cancelMethod();
Transaction transaction = transactionManager.getCurrentTransaction();
TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());
if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
//设置事务上下文到Editor,Editor用来统一提取上下文,这边对应设置dubbo的rpc上下文中去
//这边的上下文设置后就会调用try逻辑
FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
}
Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());
//目前的用法,其实只要保存调用参数就行,因为具体执行confirm和cancel都是根据transaction的status来判断的
//confirm的调用上下文
InvocationContext confirmInvocation = new InvocationContext(targetClass,
confirmMethodName,
method.getParameterTypes(), pjp.getArgs());
//cancel的调用上下文
InvocationContext cancelInvocation = new InvocationContext(targetClass,
cancelMethodName,
method.getParameterTypes(), pjp.getArgs());
Participant participant =
new Participant(
xid,
confirmInvocation,
cancelInvocation,
compensable.transactionContextEditor());
//把participant设置到transaction,并且同步到持久化存储
transactionManager.enlistParticipant(participant);
}
通过从@Compensable注解配置的信息以及当前Transaction来配置Participant。
在Participant设置到Transaction后,会执行pjp.proceed(pjp.getArgs()),也就执行了对应try逻辑的被切方法
ConfigurableCoordinatorAspect的逻辑会在ConfigurableTransactionAspect后执行,这和它们设置的order有关,小的order先执行,后切入
失败补偿机制
对于失败的Confirm和Cancel操作,会有补偿任务进行重试,具体实现类为RecoverScheduledJob,在这个类的init方法会启动quartz任务
public void init() {
try {
MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
//设置定时任务执行的对象和方法
jobDetail.setTargetObject(transactionRecovery);
jobDetail.setTargetMethod("startRecover");
jobDetail.setName("transactionRecoveryJob");
jobDetail.setConcurrent(false);
jobDetail.afterPropertiesSet();
CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
cronTrigger.setBeanName("transactionRecoveryCronTrigger");
//设置cron表达式
cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
cronTrigger.setJobDetail(jobDetail.getObject());
cronTrigger.afterPropertiesSet();
scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
scheduler.start();
} catch (Exception e) {
throw new SystemException(e);
}
}
在这个方法里会使用RecoverConfig的配置初始化定时任务,定时任务具体的执行逻辑使用MethodInvokingJobDetailFactoryBean的targetObject和targetMethod配置,对应为transactionRecovery的startRecover方法,我们来看下这个方法
public void startRecover() {
//获取所有没被处理的transaction
List<Transaction> transactions = loadErrorTransactions();
//根据规则处理这些transaction
recoverErrorTransactions(transactions);
}
分别看下上述两个方法的逻辑
private List<Transaction> loadErrorTransactions() {
long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();
//获取在RecoverDuration间隔之前未完成的transaction
return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));
}
private void recoverErrorTransactions(List<Transaction> transactions) {
for (Transaction transaction : transactions) {
//重试次数超过上限的Transaction不再执行补偿
if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));
continue;
}
//如果是分支事务,并且超过最长超时时间忽略
if (transaction.getTransactionType().equals(TransactionType.BRANCH)
&& (transaction.getCreateTime().getTime() +
transactionConfigurator.getRecoverConfig().getMaxRetryCount() *
transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000
> System.currentTimeMillis())) {
continue;
}
try {
transaction.addRetriedCount();
//对超时的confiring操作重试
if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
transaction.changeStatus(TransactionStatus.CONFIRMING);
transactionConfigurator.getTransactionRepository().update(transaction);
transaction.commit();
transactionConfigurator.getTransactionRepository().delete(transaction);
} else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)//对超时的Canceling操作重试,或者Root超时的trying进行cancel操作
|| transaction.getTransactionType().equals(TransactionType.ROOT)) {
transaction.changeStatus(TransactionStatus.CANCELLING);
transactionConfigurator.getTransactionRepository().update(transaction);
transaction.rollback();
transactionConfigurator.getTransactionRepository().delete(transaction);
}
} catch (Throwable throwable) {
if (throwable instanceof OptimisticLockException
|| ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {
logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
} else {
logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);
}
}
}
}
注意一点,trying阶段不会重试,失败未处理,会触发canceling操作
思考
分布式事务解决方案
下面列举一些分布式事务解决方案的特性
- 传统的二/三阶段提交
这种解决方案会占用数据库事务资源,在互联网公司很少使用 -
异步确保型事务
基于可靠消息的最终一致性,可以异步,但数据绝对不能丢,而且一定要记账成功
这个难道是依赖mq支持的事务特性? -
最大努力通知型事务
按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对
目前项目比较常用的方式 -
tcc
适用于实时性要求比较高,数据必须可靠的场景
我的理解
依照我目前的工作经验,现在公司对分布式事务的解决方案一般是上述的第三种方法,但是一些对实时性要求比较高,数据必须可靠的场景我们还是可以考虑使用tcc的,但是也没必要全盘tcc,可以和最大努力通知型事务一起使用
对于tcc还有一个疑问,在高并发情况下,在mq的模式下,由于是异步,能够保证消息最终被消费掉,并且消费速率稳定,而tcc这种模式,会不会导致接口资源不够用,接口资源都占用满,导致不断的try失败。
由此可见tcc的使用难度不止在业务使用方式上,对于一些极限的场景,需要有经验的人来分析tcc该使用在多大范围内。但是如果是并发量不大的项目,大家可以试着使用。
朋友们,使用或没使用过tcc的,请留下你的想法。
最后
希望大家关注下我的公众号
image