分布式

seate分布式事务调研

2019-06-10  本文已影响0人  无聊之园

官网:https://github.com/seata/seata/wiki/%E6%A6%82%E8%A7%88

首先,跑一下官网的spring cloud的例子。
大概步骤:
1、跑spring cloud对应的sql脚本。
2、修改几个服务的数据库连接密码。
3、下载seate服务端程序,运行起来。
4、把spring cloud的几个服务跑起来。
5、调一下服务,发现事务起作用了。

然后,把seate集成到自己的spring cloud项目中:
1、添加两个配置文件。
2、修改application.yml配置文件。
3、添加datasouce的代理。
4、添加数据库undo_log表。
5、添加注解进测试。
6、测试通过。

文末是undo_log的大概逻辑源码分析。

然后,研读官网介绍。
做一个大概的解读。
二、seate的发展:前身是txc,然后发布到阿里云变成了gts,然后今年开源,变成了fescar,然后又改名成seate。

2.1 设计初衷

  1. 业务无侵入:引入分布式事务,不能影响业务。比如,tcc着这种人工补偿是不行的。(虽然seate也有补偿,但是是不需要人工干预的)
  2. 高性能:不能拖慢业务。比如xa机制,只有等到所有事务都执行完了,才释放锁,严重影响事务。(曾经用过lcn,这个东西就是二阶段提交,但是spring boot 2支持的很不好,最后放弃)

高性能方面,体现在本地事务执行完,就commit释放锁,这方面,但是,如果都是seate事务,还是持有全局锁。

2.2 已有的分布式事务方案
xa: 单服务多数据库事务,好做,多服务,则tc要抽出来,性能各方面,各种不好做。
tcc:手写补偿业务,不妥。
基于消息中间件的最终一致性:异常回滚方面先捕获,通用性不好,而且特别麻烦。

2.3 理想的分布式事务
能够想本地事务一样,解决分布式事务问题,通过一个注解transaction注解,就能搞定。
lcn也能入本地事务一样搞定分布式事务,但是这个东西,性能不好,而且各种问题,更可恶的是,spring boot2根本无法完。

三、原理与设计

3.三个组件
可以看到,比mysql的xa,多了一个tc部分。

transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID。
XID 在微服务调用链路的上下文中传播。
RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
TM 向 TC 发起针对 XID 的全局提交或回滚决议。
TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

XA 方案的 RM 实际上是在数据库层
seate的XA是应用层,不依赖数据库。

XA的锁要在所有事务进入commit阶段才释放。
seate,锁本地事务一执行完就释放。
seate的设计想法是:几乎90%的事务都是成功的,所以是可以马上释放锁的。
如果,有一个事务出现异常了,需要全局回滚的时候,再去争取全局锁。

3.3分支事务如何提交和回滚?
seate对jdbc做了一个代理,会分析sql,然后生成, 这条记录修改前后的镜像(也就是一个json),然后保存在事务所在数据库undo_log表中(所以每个服务都需要一张这个表)。如此,当回滚的时候,取出这个镜像,解析成反向sql,执行就完了。
所以这里的关键点在于:他能分析sql,然后解析出反向sql。也就省去了,手写补偿业务的问题。

3.4事务的传播属性
几乎都可以通过api支持,但是spring事务传播属性,这个源码我没搞明白。

3.5隔离性
事务本身有隔离级别,这是一点。
seate有一个全局锁,多个seate事务的update会有排他锁,保证事务的一致性。当然,由于本地事务执行完了就提交了,也就是数据库本身释放锁了,如果另外一个事务不是seate事务,是没有全局锁的。

seate全局上看,隔离级别是读未提交的,因为,每个本地事务,执行完了,就提交了,但是其实全局事务时没有执行完的,但是这份数据,还是会被别的事务看到,所以说,全局上看,是读未提交的。
seate说,他也可以做到,全局,读已提交的隔离级别。

4.适用场景
因为依赖本地acid特性,所以需要关系型数据库。
还不完善的地方:全局事务最高也就到读已提交的级别。而且,sql解析反向sql有些语法不支持。
前面说的是AT模式,seate还有Mt模式,mt模式是另外一种模式。

4.1分支的基本模式
分支注册:分支事务开启,向seate服务发送注册。
状态上报:分支执行完,有问题还是没问题都要发送报告。
分支提交:没问题,则提交。
分支回滚:有问题,则回滚,并且,全局回滚。

总之:seate有两个特别的思想:

  1. 本地事务执行完之后,就提交。通过自动生成回滚sql的形式回滚。
    2.有解析sql的回滚sql的能力。

五、集成到自己项目测试
集成方式,前面说了。
关键。
A服务:

@GetMapping("/test")
    @ApiOperation(value = "测试", notes = "输入参数:selectionActivitiesId")
    @GlobalTransactional
    public Result test() {
        selectionActivitiesFeigin.test();
        throw new RuntimeException("shibai");
    }

远程调用B服务。

 @GetMapping("/test")
   public Result test() {
        selectionActivitiesService.test();
        // 走到这里,B服务的本地事务已经提交,数据库,已经删掉了一条记录
    return Result.success("ok");
    }
 @Transactional
    public void test() {
        selectionActivitiesMapper.deleteById(14);
    }

最后,回滚,删掉的记录又回来了。可见,全局事务,隔离级别是读未提交。

然后看undo_log:可以看到,rollback_info是一个blob字段,记录了操作前的记录的镜像。(其实就是一条json)

13  2013937050  192.168.106.1:8091:2013937049   (BLOB) 1.15 KB  0   2019-06-11 07:47:11 2019-06-11 07:47:11 

看源码:
关于undolog的解析就是这个类。打断点。

public class JSONBasedUndoLogParser implements UndoLogParser {

    @Override
    public String encode(BranchUndoLog branchUndoLog) {
        return JSON.toJSONString(branchUndoLog, SerializerFeature.WriteDateUseDateFormat);
    }

    @Override
    public BranchUndoLog decode(String text) {
        return JSON.parseObject(text, BranchUndoLog.class);
    }
}

会发现,最后,把BranchUndoLog类json化成下面这个类。记录了delete前的,这条记录的的所有字段。


image.png

UndoLogManager这个类操作的。

 public static void flushUndoLogs(ConnectionProxy cp) throws SQLException {
        assertDbSupport(cp.getDbType());

        ConnectionContext connectionContext = cp.getContext();
        String xid = connectionContext.getXid();
        long branchID = connectionContext.getBranchId();

        BranchUndoLog branchUndoLog = new BranchUndoLog();
        branchUndoLog.setXid(xid);
        branchUndoLog.setBranchId(branchID);
        branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
        // 解析成json
        String undoLogContent = UndoLogParserFactory.getInstance().encode(branchUndoLog);
    
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Flushing UNDO LOG: " + undoLogContent);
        }
        // 插入数据库
        insertUndoLogWithNormal(xid, branchID, undoLogContent, cp.getTargetConnection());
    }
// 这是jdbc操作,这个类,定义了很多解析sql的模板。
private static void insertUndoLog(String xid, long branchID,
                                      String undoLogContent, State state, Connection conn) throws SQLException {
        PreparedStatement pst = null;
        try {
            pst = conn.prepareStatement(INSERT_UNDO_LOG_SQL);
            pst.setLong(1, branchID);
            pst.setString(2, xid);
            pst.setBlob(3, BlobUtils.string2blob(undoLogContent));
            pst.setInt(4, state.getValue());
            pst.executeUpdate();
        } catch (Exception e) {
            if (!(e instanceof SQLException)) {
                e = new SQLException(e);
            }
            throw (SQLException) e;
        } finally {
            if (pst != null) {
                pst.close();
            }
        }
    }

然后看回滚执行逻辑。

 public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
       ...
// 这里反系列化,undolog的json
                    BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);

                    for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {
                        TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
                        sqlUndoLog.setTableMeta(tableMeta);
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                            dataSourceProxy.getDbType(),
                            sqlUndoLog);
                        undoExecutor.executeOn(conn);
                    }
                }
        ...
        
    }
public void executeOn(Connection conn) throws SQLException {

        if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
            return;
        }
        
        try {
            // 把那个undo_log的json,解析成反向sql
            String undoSQL = buildUndoSQL();
             // 然后执行
            PreparedStatement undoPST = conn.prepareStatement(undoSQL);
            TableRecords undoRows = getUndoRows();
            for (Row undoRow : undoRows.getRows()) {
                ArrayList<Field> undoValues = new ArrayList<>();
                Field pkValue = null;
                for (Field field : undoRow.getFields()) {
                    if (field.getKeyType() == KeyType.PrimaryKey) {
                        pkValue = field;
                    } else {
                        undoValues.add(field);
                    }
                }
                undoPrepare(undoPST, undoValues, pkValue);
                undoPST.executeUpdate();
            }
        }
    }

至此,大概,seate的undo_log部分,大概逻辑就是这样,存储修改前镜像json,回滚的时候,解析这个json成回滚sql,然后执行。

上一篇 下一篇

猜你喜欢

热点阅读