分布式事务Seata使用说明

2020-08-18  本文已影响0人  lbjfish

目前功能如下

  • Seata服务已做高可用+负载均衡(目前三台机器分别在32、36、37)
  • 外挂事务日志数据库(目前在开发环境数据库database=seata,也是为了高可用)
  • 使用Nacos作为Seata注册中心以及配置中心(配置中心namespace=seata 勿删
  • 支持普通本地事务+分布式事务组合
  • 整合ShardingSphere分布式柔性事务(这个比较特殊,单独讲)

说明

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。
Seata分为三块分别是:
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

事务模式选型

Seata支持四种事务模式,分别是AT、TCC、SAGA和XA,本项目使用无注入方式AT模式,当然结合TCC或SAGA,或使用混合型。

TCC对业务侵入大,性能比AT略高,而SAGA性能最高(适合流程长,像银行业务),基于无锁模式,但是会有脏读问题(AT也有)不保证隔离型。TCC则完全避免各种脏读问题,但是缺点也是开发麻烦,因为每个目标字段都需要一个冻结字段来支撑事务操作。

没有完美的分布式事务解决方案,基于业务迭代速度,加上性能综合考虑,选定了AT模式。AT模式优点是业务代码无侵入,基于注解方式,直接操作本地关系型数据库等优点,同时AT会存在两把锁,全局锁和本地锁,分别对应TM和RM,是典型的2PC方式。XA模式不考虑(想了解这种模式的可以去了解下)。

Seata的全局事务是基于XIDBranch ID,主要通过feignxid在各服务之间传递,而通过xid+branchId唯一确定一条undo_log数据(此时是阶段一,因为已经插入一条undo_log了),拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改(此时是阶段二),阶段一前后镜:

{
    "branchId": 641789253,
    "undoItems": [{
        "afterImage": {
            "rows": [{
                "fields": [{
                    "name": "id",
                    "type": 4,
                    "value": 1
                }, {
                    "name": "name",
                    "type": 12,
                    "value": "GTS"
                }, {
                    "name": "since",
                    "type": 12,
                    "value": "2014"
                }]
            }],
            "tableName": "product"
        },
        "beforeImage": {
            "rows": [{
                "fields": [{
                    "name": "id",
                    "type": 4,
                    "value": 1
                }, {
                    "name": "name",
                    "type": 12,
                    "value": "TXC"
                }, {
                    "name": "since",
                    "type": 12,
                    "value": "2014"
                }]
            }],
            "tableName": "product"
        },
        "sqlType": "UPDATE"
    }],
    "xid": "xid:xxx"
}

使用方法

目前只有开发环境有配置,QA和生产需要另外配置

分为ShardingSphere服务和非ShardingSphere服务两种使用方法稍有区别

注:数据源一定要使用阿里的druid
注:数据源一定要使用阿里的druid
注:数据源一定要使用阿里的druid
重要的事情说三遍

非ShardingSphere服务使用方法(普通服务)

1.在bootstrap-dev.yml的spring.cloud.nacos.config.extension-configs引入如下共享配置(seata-public-share.yml

spring:
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.128.20
        namespace: 1c53b59e-9f3d-44a4-b2d7-5faaea25b3c6 # dev 环境的命名空间
      config:
        server-addr: 192.168.128.20
        file-extension: yml
        group: uaa-server
        namespace: 1c53b59e-9f3d-44a4-b2d7-5faaea25b3c6 # dev 环境的命名空间
        extension-configs:
          - data-id: seata-public-share.yml
            group: share
            refresh: true

2.引入pom包

        <!-- seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-spring-boot-starter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.3.0</version>
        </dependency>
    </dependencies>

3.最后在各自Service业务代码加入@GlobalTransactional注解即可
注:只需要在调用顶层Service加注解即可,其他feign服务不需要加任何注解,包括本地事务注解
注:只需要在调用顶层Service加注解即可,其他feign服务不需要加任何注解,包括本地事务注解
注:只需要在调用顶层Service加注解即可,其他feign服务不需要加任何注解,包括本地事务注解

    @GlobalTransactional
    public void placeOrder(String userId) {
        storageFeignClient.deduct(COMMODITY_CODE, ORDER_COUNT);

        orderFeignClient.create(userId, COMMODITY_CODE, ORDER_COUNT);
    }

4.最后在mysql各database下加入undo_log表:

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

普通服务配置结束。

ShardingSphere服务使用方法

sharding服务的配置稍有不同

1.在bootstrap-dev.yml的spring.cloud.nacos.config.extension-configs引入如下共享配置(seata-sharding-public-share.yml),不懂的借鉴上面普通服务。

2.除了引入上面的普通服务的pom以外,还要引入如下pom包:

        <dependency>
            <groupId>org.apache.shardingsphere</groupId>
            <artifactId>sharding-transaction-base-seata-at</artifactId>
            <version>${sharding-sphere.version}</version>
        </dependency>

为什么要引入上述pom包?
从ShardingSphere 官方图像显示,Seata与ShardingSphere 框架都是对DataSource 进行封装和处理,所以要将Seata事务融入到ShardingSphere 框架中使用,就需要将Seata框架中DataSourceProxy包装给ShardingSphere 框架的ShardingTransactionManager接口。
而在seata1.3版本和ShardingSphere 4以上版本已经整合了代理,具体请看源码和ShardingSphere 官方解释。

3.resource资源目录下加入seata.conf配置文件

文件内容如下:

client {
    #spring-cloud-starter-alibaba-seata中application.id与application.name一致
    application.id = uaa-server
    transaction.service.group = server_tx_service_group
}

4.1除了加入@GlobalTransactional注解外,还需要在每一个本地库事务加入TransactionTypeHolder.set(TransactionType.BASE);ds-product服务为例:

    @GlobalTransactional
    //@Transactional
    public void deleteEbayMyProduct(long id, ProductPushEbay productPushEbay, String dropShipperId) {
        log.info("xid================{}", RootContext.getXID());
        
        TransactionTypeHolder.set(TransactionType.BASE);
        productPushEbayService.deleteMyProductPush(Long.valueOf(id), dropShipperId);
        
        TransactionTypeHolder.set(TransactionType.BASE);
        productVariantPushEbayService.deleteMyProductVariants(productPushEbay.getItemcode().toString(), dropShipperId);

        TransactionTypeHolder.set(TransactionType.BASE);
        productPushEbayService.insertLeeb(String.valueOf(new Random().nextInt()));

        dsOrderClient.testSeata("error");

        if(dropShipperId.equals("28d3e7a9ea75150c6ad03d95980cfd4a")){
            throw new RuntimeException("错误了。。。。。。。。");
        }
    }

4.2 如果觉得本地库增删改太多,每一个Service都加一个TransactionTypeHolder.set(TransactionType.BASE);太过麻烦,可以直接加入@Transactional注解,强制使Seata优先使用本地事务。只需要在方法第一行加入TransactionTypeHolder.set(TransactionType.BASE);即可,后续Service则不需要加。

  1. 最后,同普通服务4.各分库下加入undo_log表。

ShardingSphere 服务配置结束。

说明:不管是普通服务还是ShardingSphere 服务都可以只在最外层(Feign调用方)Service加@GlobalTransactional注解,其它Feign被调用服务无须加任何事务注解,不管是Seata事务注解还是本地事务注解,这样做的好处可以柔性提交各服务分支,不会因为子服务网络问题或者其他问题卡住,严重消耗性能,如果出现异常,则各服务都会补偿性回滚。当然,也可以在各服务Service加@Transactional注解,使各本地事务达到强一致。
在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。所以是要取舍各服务达到强一致性而牺牲部分性能,还是先提交一部分(不管是本地事务还是Feign服务事务)后续最终一致性补偿,效率高,但会有脏读或死锁可能,这是大家在业务中进行处理选型考虑的。

注意

注:如果需要分布式事务,一定要抛异常SeataTrBusinessException,而不要抛BusinessException,因为BusinessException分布式情况下不会回滚。

注:如果需要分布式事务,一定要抛异常SeataTrBusinessException,而不要抛BusinessException,因为BusinessException分布式情况下不会回滚。

注:如果需要分布式事务,一定要抛异常SeataTrBusinessException,而不要抛BusinessException,因为BusinessException分布式情况下不会回滚。

重要的事说三遍

其他(开发不用管)

seata启动服务命令
32=1 37=2 36=3
nohup sh ./seata-server.sh -p 8091 -h 192.168.128.36 -m db -n 3 &
nohup sh ./seata-server.sh -p 8091 -h 192.168.128.37 -m db -n 2 &
nohup sh ./seata-server.sh -p 8091 -h 192.168.128.32 -m db -n 1 &

jobs 查看上面启动的服务状态命令

生成seata的nacos配置文件命令
sh nacos-config.sh -h 192.168.128.20 -p 8848 -g SEATA_GROUP -t aebcc8df-bcca-4f1d-8f27-fa98828753d7

windows启动命令(本机)
.\seata-server.bat -p 28091 -m db -n 4

上一篇下一篇

猜你喜欢

热点阅读