JavaSpring Cloud

Spring Cloud笔记(8)使用Seata管理分布式事务

2020-05-06  本文已影响0人  飞空之羽

分布式事务介绍

所谓事务,就是一系列业务操作构成的独立的执行单元。比如用户购买商品下单的行为,需要执行创建订单,扣减商品库存的两个不同的数据库操作,这就是一个事务。事务最重要的特性就是要支持原子性,要么所有操作全部成功,要么全部失败。为什么要这样设计呢?如果一切顺利,当然什么问题都不会有。但天有不测风云,没有谁能保证系统一直不会出错,如果哪一天订单已经创建成功了,但在扣减对应商品库存时突然失败了,那就麻烦大了,订单数据和商品的库存数据可能就对不上了,有可能商品早都没货了,客户还能继续下单购买。

所以,事务在保持数据一致性的方面是非常重要的。在单服务系统中我们一般不需要为事务操心,数据库已经为我们考虑了一切,只要在操作开始前声明了事务,那么调用过程只要发生了错误事务会自动回滚到操作开始时的状态。

但在微服务系统中,不同的业务操作可能被分割到不同的模块,而不同业务模块都会配置一个独立的数据源,甚至订单服务和仓储服务可能都不在同一个数据库中,这样显然就不能只依靠本地数据库事务来解决问题。我们需要一种能够跨网络和应用的分布式事务机制,分布式事务的主要实现思路一般分为两种:

Seata 的相关概念

由于类似XA的刚性事务实现在复杂的分布式环境中存在大量的问题,所以目前主流的分布式事务实现方案都是走柔性事务路线的。比较流行的解决方案有阿里开源的seata,还有独立开发者发布的LCN框架,但LCN目前的维护更新遇到了困难(独立开发者的悲哀啊)。seata提供了几种不同的事务模式实现,包括:AT、TCC、SAGA 和 XA。在这里我们重点介绍一下AT模式,其它模式的说明请查看 seata的官网。AT模式也是走的补偿路线,但是不需要应用程序去关心调用失败后如何恢复数据,而是由框架本身负责去恢复收当前事务影响的所有数据,这非常类似于我们已经习惯了的本地事务,对开发者的负担也比较小。

seata可分为三个角色:TC,TM和RM:

官网上用户购买商品的例子

部署TC server

首先在 官网 下载seata server的二进制文件,其实就是一个spring boot的应用程序。在服务器上解压之后,进入到conf文件夹,我们需要重点关注registry.conf这个文件,它的作用是配置seata server(TC)注册到哪个注册中心,目前支持几乎所有主流的注册中心,TC会从配置中心中读取相应的配置,TM和RM实例也可以通过同一个注册中心找到TC部署的位置,从而实现TC的集群化部署。我们还是延用之前就部署好的consul来作为注册中心,只需要将registry.conf中的type修改为“consul”即可,并调整配置文件中consul的部署地址:

# 注册中心配置,用于TC,TM,RM的相互服务发现
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "consul"
  consul {
    cluster = "seata"
    serverAddr = "192.168.1.220:8500"
  }
}
# 配置中心配置,用于读取TC的相关配置
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"
  file {
    name = "file.conf"
  }
}

file.conf中是TC的一些持久化配置选项,seata支持文件和数据库两种持久化方式,文件模式比数据库更快,但不支持并发操作,如果要部署TC集群就必须要使用数据库进行持久化。这里我们为了简化,就直接使用默认的file模式。file.conf也可以初始化到配置中心,通过配置中心来统一读取,这对于集群部署是有帮助的,初始化的方式可参考 这里 。修改完配置后,直接运行seata-server.sh启动TC server:

$ sh ./bin/seata-server.sh -p 8091 -h 127.0.0.1

启动成功后我们可以在consul的控制台看到TC注册的服务:


TC注册成功

集成TM和RM

seata的AT模式会在服务调用失败后,自动恢复受影响的数据,其原理就是在启动事务之后,会自动分析当前事务中执行的SQL对数据的影响,把受影响的数据直接存储到本地数据库中,如果事务回滚了,就通过存储的数据备份对原始数据进行恢复( AT模式介绍 ),所以我们需要在每个RM所在的业务数据库中初始化seata的undo_log表。我们之前的spring-cloud-demo中也还没有建立相应的业务数据库,为了测试分布式事务,我们需要为order-service和storage-service创建两个业务库,并初始化相关的业务表格,整个初始化脚本如下:


-- 创建order-service数据库
CREATE DATABASE `cloud-demo-order`;

CREATE TABLE IF NOT EXISTS `cloud-demo-order`.`t_order` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `customer_code` varchar(45) DEFAULT NULL COMMENT '客户编码',
  `good_code` varchar(45) DEFAULT NULL COMMENT '产品编码',
  `good_quantity` int(11) DEFAULT NULL COMMENT '购买数量',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- AT模式需要的undo log表
CREATE TABLE IF NOT EXISTS `cloud-demo-order`.`undo_log`
(
    `id`            BIGINT(20)   NOT NULL AUTO_INCREMENT COMMENT 'increment id',
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME     NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME     NOT NULL COMMENT 'modify datetime',
    PRIMARY KEY (`id`),
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

-- 创建storage-service数据库
CREATE DATABASE `cloud-demo-storage`;

CREATE TABLE IF NOT EXISTS `cloud-demo-storage`.`t_inventory` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `good_code` varchar(45) DEFAULT NULL COMMENT '产品编码',
  `good_quantity` int(11) DEFAULT NULL COMMENT '库存总量',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- AT模式需要的undo log表
CREATE TABLE IF NOT EXISTS `cloud-demo-storage`.`undo_log`
(
    `id`            BIGINT(20)   NOT NULL AUTO_INCREMENT COMMENT 'increment id',
    `branch_id`     BIGINT(20)   NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(100) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME     NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME     NOT NULL COMMENT 'modify datetime',
    PRIMARY KEY (`id`),
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

初始化脚本后,在order-serivce模块和storage-service模快引入seata的依赖包和相关配置:

  <!--在spring cloud中自动配置seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

需要注意的是maven库中还有一个包名称是spring-cloud-alibaba-seata,我看了一下两者的源码是一致的,任意引用哪一个包都可以,不知道为什么会有两个不同名称。在模块的application.yml中加入如下内容(以order-service模块的配置为例):

# Seata 配置项,对应 SeataProperties 类
seata:
  application-id: ${spring.application.name} # Seata 应用编号,默认为 ${spring.application.name}
  tx-service-group: ${spring.application.name}-group # 该应用所属事务组编号,用于寻找TC集群的映射
  # Seata 服务配置项,对应 ServiceProperties 类
  service:
    #事务分组与TC集群(seata)集群的映射关系,order-service-group对应tx-service-group参数,值为一个虚拟的TC集群名称
    #默认值就是default,如果无需分组可以不用设置
    vgroup-mapping:
      order-service-group: default
  # Seata注册中心配置项
  registry:
    type: consul # 注册中心类型,默认为 file
    consul:
      cluster: seata #对应seata集群在consul中注册的服务名
      server-addr: 192.168.1.220:8500

需要注意的是 registry.consul.cluster 中指定的名称需要与TC server的同名配置项中指定的名称一致(默认的值是default),否则会找不到TC server,这个在官方文档和很多教程中都没有说明。 更多配置内容可以查看 配置说明

另外为了方便对数据库进行操作,order-serivce模块和storage-service模快都引入了mybatis-plus框架的相关依赖和配置,这里就不做详细介绍了,具体可以去查看源码。一切准备好以后,将原有的orderService.createNewOrder方法做如下改造:

    @GlobalTransactional
    public Integer createNewOrder(OrderDTO orderDTO) {
        Order newOrder = new Order();
        newOrder.setCustomerCode(orderDTO.getCustomerCode());
        newOrder.setGoodCode(orderDTO.getGoodCode());
        newOrder.setGoodQuantity(orderDTO.getQuantity());
        //向本地数据库插入订单信息
        this.save(newOrder);
        InventoryChangeDTO req = new InventoryChangeDTO();
        req.setGoodCode(orderDTO.getGoodCode());
        req.setQuantity(orderDTO.getQuantity());
        //调用远程仓储服务变更库存
        Integer remainQuantity = storageService.updateInventoryOfGood(req);
        return remainQuantity;
    }

其实除了写入数据库的相关代码,这里最重要的变化就是加入了@GlobalTransactional注解,这个注解标记了当前方法会开启一个全局事务。在本例中order-service模块既是 TM (我的理解是声明@GlobalTransactional的地方就算是一个TM)又是 RM,而storage-service模块则是另一个 RM

通过PostMan请求创建订单的接口地址:http://localhost:9001/api/order/create-order,一切顺利的话就能在数据库中看到新增的订单数据和库存数据。

PostMan请求订单创建接口
从相关的日志中可以看出全局事务的开始和提交:
2020-05-06 15:28:21.311  INFO 11516 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [192.168.1.220:8091:2010342569]
2020-05-06 15:28:21.457  INFO 11516 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : [192.168.1.220:8091:2010342569] commit status: Committed
2020-05-06 15:28:21.457  INFO 11516 --- [nio-9001-exec-7] c.g.d.s.o.controller.Controller          : 剩余数量:-20
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] i.s.core.rpc.netty.RmMessageListener     : onMessage:xid=192.168.1.220:8091:2010342569,branchId=2010342570,branchType=AT,resourceId=jdbc:mysql://192.168.1.212:3306/cloud-demo-order,applicationData=null
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] io.seata.rm.AbstractRMHandler            : Branch committing: 192.168.1.220:8091:2010342569 2010342570 jdbc:mysql://192.168.1.212:3306/cloud-demo-order null
2020-05-06 15:28:21.576  INFO 11516 --- [atch_RMROLE_1_4] io.seata.rm.AbstractRMHandler            : Branch commit result: PhaseTwo_Committed

我们将 storageService.updateInventoryOfGood 方法稍稍修改一下,故意引发一个异常来测试全局事务的回滚:

public Integer changeInventory(InventoryChangeDTO req) {
        Inventory inventory = this.getOne(Wrappers.<Inventory>lambdaQuery().eq(Inventory::getGoodCode, req.getGoodCode()));
        if (inventory == null) {
            inventory = new Inventory();
            inventory.setGoodQuantity(0);
            inventory.setGoodCode(req.getGoodCode());
        }
        inventory.setGoodQuantity(inventory.getGoodQuantity() - req.getQuantity());
        this.saveOrUpdate(inventory);
        //引发异常回滚
        Object exceptionCause = null;
        exceptionCause.toString();
        return inventory.getGoodQuantity();
    }

还需要注意一点,测试全局事务回滚时需要将我们直接配置的hystrix fallback关闭,否则应用程序调用远程接口失败后会触发fallback机制,从而让seata认为远程调用是成功的,就不会触发回滚:

//@FeignClient(name = "storage-service", fallback = StorageServiceFallback.class)
//测试全局事务回滚时需要注释掉fallback,否则接口会返回默认的值导致事务无法回滚
@FeignClient(name = "storage-service")
public interface StorageService {

    @PostMapping("/api/storage/change-inventory")
    Integer updateInventoryOfGood(InventoryChangeDTO inventoryChangeDTO);

}

然后我们再次请求创建订单的服务,从日志上可以看出,事务已成功触发回滚操作:

2020-05-06 15:50:57.809  INFO 3804 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : Begin new global transaction [192.168.1.220:8091:2010342574]
2020-05-06 15:50:59.157  INFO 3804 --- [orage-service-1] c.netflix.config.ChainedDynamicProperty  : Flipping property: storage-service.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-05-06 15:50:59.275  INFO 3804 --- [orage-service-1] c.n.u.concurrent.ShutdownEnabledTimer    : Shutdown hook installed for: NFLoadBalancer-PingTimer-storage-service
2020-05-06 15:50:59.275  INFO 3804 --- [orage-service-1] c.netflix.loadbalancer.BaseLoadBalancer  : Client: storage-service instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=storage-service,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null
2020-05-06 15:50:59.290  INFO 3804 --- [orage-service-1] c.n.l.DynamicServerListLoadBalancer      : Using serverListUpdater PollingServerListUpdater
2020-05-06 15:50:59.325  INFO 3804 --- [orage-service-1] c.netflix.config.ChainedDynamicProperty  : Flipping property: storage-service.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
2020-05-06 15:50:59.329  INFO 3804 --- [orage-service-1] c.n.l.DynamicServerListLoadBalancer      : DynamicServerListLoadBalancer for client storage-service initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=storage-service,current list of Servers=[192.168.1.252:9002],Load balancer stats=Zone stats: {unknown=[Zone:unknown;   Instance count:1;   Active connections count: 0;    Circuit breaker tripped count: 0;   Active connections per server: 0.0;]
},Server stats: [[Server:192.168.1.252:9002;    Zone:UNKNOWN;   Total Requests:0;   Successive connection failure:0;    Total blackout seconds:0;   Last connection made:Thu Jan 01 08:00:00 CST 1970;  First connection made: Thu Jan 01 08:00:00 CST 1970;    Active Connections:0;   total failure count in last (1000) msecs:0; average resp time:0.0;  90 percentile resp time:0.0;    95 percentile resp time:0.0;    min resp time:0.0;  max resp time:0.0;  stddev resp time:0.0]
]}ServerList:ConsulServerList{serviceId='storage-service', tag=null}
2020-05-06 15:51:00.119  INFO 3804 --- [nio-9001-exec-7] i.seata.tm.api.DefaultGlobalTransaction  : [192.168.1.220:8091:2010342574] rollback status: Rollbacked

本文的相关代码可以查看这里 spring-cloud-demo

上一篇下一篇

猜你喜欢

热点阅读