Seata AT 模式是如何处理悬挂和空回滚
2021-05-06 本文已影响0人
DH大黄
空回滚描述:
摘自Seata官网(在未收到 Try 请求的情况下收到 Cancel 请求,这种场景被称为空回滚;空回滚在生产环境经常出现,用户在实现TCC服务时,应允许允许空回滚的执行,即收到空回滚时返回成功。)
AT 模式是利用 undo log 来处理空回滚的,具体细节见代码
// io.seata.rm.datasource.DataSourceManager#branchRollback
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException();
}
try {
// 获取 undo log 并执行 undo 操作
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
} catch (TransactionException te) {
StackTraceLogger.info(LOGGER, te,
"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
// io.seata.rm.datasource.undo.AbstractUndoLogManager#undo
@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for (; ; ) {
try {
conn = dataSourceProxy.getPlainConnection();
// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
// Find UNDO LOG
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true;
// It is possible that the server repeatedly sends a rollback request to roll back
// the same branch transaction to multiple processes,
// ensuring that only the undo_log in the normal state is processed.
// 判断是否能够回滚
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
}
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
byte[] rollbackInfo = getRollbackInfo(rs);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}
// If undo_log exists, it means that the branch transaction has completed the first phase,
// we can directly roll back and clean the undo_log
// Otherwise, it indicates that there is an exception in the branch transaction,
// causing undo_log not to be written to the database.
// For example, the business processing timeout, the global transaction is the initiator rolls back.
// To ensure data consistency, we can insert an undo_log with GlobalFinished state
// to prevent the local transaction of the first phase of other programs from being correctly submitted.
// See https://github.com/seata/seata/issues/489
if (exists) {
// 删除回滚日志
deleteUndoLog(xid, branchId, conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
State.GlobalFinished.name());
}
} else {
// 回滚日志不存在,插入一条全局事务已完成的回滚记录
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
State.GlobalFinished.name());
}
}
return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
}
} catch (Throwable e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
}
}
throw new BranchTransactionException(BranchRollbackFailed_Retriable, String
.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
branchId, e.getMessage()), e);
} finally {
try {
if (rs != null) {
rs.close();
}
if (selectPST != null) {
selectPST.close();
}
if (conn != null) {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
}
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
}
}
}
详细的流程见下图
AT 空回滚.png
悬挂描述
摘自Seata官网(事务协调器在调用 TCC 服务的一阶段 Try 操作时,可能会出现因网络拥堵而导致的超时,此时事务管理器会触发二阶段回滚,调用 TCC 服务的 Cancel 操作,Cancel 调用未超时;在此之后,拥堵在网络上的一阶段 Try 数据包被 TCC 服务收到,出现了二阶段 Cancel 请求比一阶段 Try 请求先执行的情况,此 TCC 服务在执行晚到的 Try 之后,将永远不会再收到二阶段的 Confirm 或者 Cancel ,造成 TCC 服务悬挂。
用户在实现 TCC 服务时,要允许空回滚,但是要拒绝执行空回滚之后 Try 请求,要避免出现悬挂。)