Dtm子事务屏障

2022-10-12  本文已影响0人  airmy丶

代码库: https://github.com/dtm-labs/client

子事务屏障原理

在本地数据库(资源管理器,即执行try、confirm、cancel的子事务端),创建一个分支操作状态表,使用全局事务id-分支事务id-分支操作(try|confirm|cancel)作为唯一键。

屏障判断流程如下:

  1. 开启本地事务(主应用程序开启Tcc事务)
  2. 对于当前操作(try|confirm|cancel),插入一条数据,通过唯一键进行约束,如果插入不成功,提交事务返回成功。
  3. 如果当前操作是cancel,在执行cancel之前想分支状态表插入一个唯一键为gid-branchid-try 的数据,如果插入成功,这直接提交事务返回成功。(空回滚)
  4. 执行屏障内业务逻辑,如果业务返回成功则成功,失败则失败。

这个流程是如何解决空回滚、幂等和悬挂的异常情况呢?

dtm 的 Tcc 模式下屏障源码

主程序调用:

gid := shortuuid.New()
err := dtmcli.TccGlobalTransaction(dtmutil.DefaultHTTPServer, gid, func(tcc *dtmcli.Tcc) (*resty.Response, error) {
resp, err := tcc.CallBranch(&busi.ReqHTTP{Amount: 30}, busi.Busi+"/TccBTransOutTry",busi.Busi+"/TccBTransOutConfirm", busi.Busi+"/TccBTransOutCancel")
if err != nil {
    return resp, err
}
return tcc.CallBranch(&busi.ReqHTTP{Amount:30},busi.Busi+"/TccBTransInTry",busi.Busi+"/TccBTransInConfirm",busi.Busi+"/TccBTransInCancel")})

资源管理器执行分支事务操作

req := reqFrom(c)
if req.TransOutResult != "" {
    return string2DtmError(req.TransOutResult)
}
bb := MustBarrierFromGin(c)
if req.Store == Redis {
    return bb.RedisCheckAdjustAmount(RedisGet(),GetRedisAccountKey(TransOutUID), req.Amount, 7*86400)
} else if req.Store == Mongo {
    return bb.MongoCall(MongoGet(), func(sc mongo.SessionContext) error {
        return SagaMongoAdjustBalance(sc, sc.Client(), TransOutUID, -req.Amount, "")
    })
}

return bb.CallWithDB(pdbGet(), func(tx *sql.Tx) error {
    return tccAdjustTrading(tx, TransOutUID, -req.Amount)
})

主要的屏障逻辑代码在 bb.CallWithDB 中(逻辑注释在代码中)

func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error) {
    bid := bb.newBarrierID()
    defer dtmimp.DeferDo(&rerr, func() error {
        return tx.Commit()
    }, func() error {
        return tx.Rollback()
    })
  
    // 这里是一个操作映射,tcc模式下执行cancel会先检测try是否已经执行
    originOp := map[string]string{
        dtmimp.OpCancel:     dtmimp.OpTry,    // tcc
        dtmimp.OpCompensate: dtmimp.OpAction, // saga
        dtmimp.OpRollback:   dtmimp.OpAction, // workflow
    }[bb.Op]

    // 这里也是主要用与cancel,执行cancel会插入try
    originAffected, oerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, originOp, bid, bb.Op, bb.DBType, bb.BarrierTableName)
    // 向本地分支状态表插入当前的op操作记录 以 gid-branchid-op 为唯一键约束
    currentAffected, rerr := dtmimp.InsertBarrier(tx, bb.TransType, bb.Gid, bb.BranchID, bb.Op, bid, bb.Op, bb.DBType, bb.BarrierTableName)
    logger.Debugf("originAffected: %d currentAffected: %d", originAffected, currentAffected)

    // 幂等判断
    if rerr == nil && bb.Op == dtmimp.MsgDoOp && currentAffected == 0 { // for msg's DoAndSubmit, repeated insert should be rejected.
        return ErrDuplicated
    }

    if rerr == nil {
        rerr = oerr
    }
  
    // 空回滚判断
    if (bb.Op == dtmimp.OpCancel || bb.Op == dtmimp.OpCompensate || bb.Op == dtmimp.OpRollback) && originAffected > 0 || // null compensate
        currentAffected == 0 { // repeated request or dangled request
        return
    }
  
    // 没有错误
    if rerr == nil {
        rerr = busiCall(tx)
    }
    return
}

总的来说,Tcc模式下的子事务屏障原理还是相对简单,应该也是比较容易理解,也很巧妙的统一解决了分布式事务下容易产生的悬挂,幂等和空回滚问题,也可做学习借鉴!

上一篇下一篇

猜你喜欢

热点阅读