分布式

seata实现分布式事务

2022-08-26  本文已影响0人  sunpy

seata是什么


Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

seata官网:https://seata.io/zh-cn/docs/overview/what-is-seata.html

docker部署seata服务器


官网操作:https://seata.io/zh-cn/docs/ops/deploy-by-docker.html

  1. 下载镜像:
docker pull seataio/seata-server:1.4.2
  1. 启动容器:
docker run -d -e SEATA_IP=外网IP -e SEATA_PORT=8091 -p 8091:8091 --name seata-server seataio/seata-server:1.4.2

注意一定要指定地址为外网地址,这样nacos就可以获取了,否则nacos默认获取内网地址,这样客户端连接不上。

  1. 进入容器:
docker exec -it 容器id /bin/sh
  1. 修改配置文件:/seata-server/resources/registry.conf
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "外网NACOS的IP:8848"
    group = "DEFAULT_GROUP"
    namespace = ""
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "外网NACOS的IP:8848"
    namespace = ""
    group = "DEFAULT_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
  }
}
  1. 修改配置文件:/seata-server/resources/file.conf
## transaction log store, only used in seata-server
store {
  ## store mode: file、db、redis
  mode = "db"
  ## rsa decryption public key
  publicKey = ""
  ## file store property
  db {
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.cj.jdbc.Driver"
    ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
    url = "jdbc:mysql://外网数据库IP:3389/seata?rewriteBatchedStatements=true"
    user = "root"
    password = "password"
    minConn = 5
    maxConn = 100
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  } 
}
  1. 建立seata数据库,导入配置sql文件:
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(128),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `status`         TINYINT      NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_status` (`status`),
    KEY `idx_branch_id` (`branch_id`),
    KEY `idx_xid_and_branch_id` (`xid` , `branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `distributed_lock`
(
    `lock_key`       CHAR(20) NOT NULL,
    `lock_value`     VARCHAR(20) NOT NULL,
    `expire`         BIGINT,
    primary key (`lock_key`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
  1. 在nacos中配置seata-server:

配置内容:

service.vgroupMapping.zhishu_group=default
service.enableDegrade=false
service.disableGlobalTransaction=false

store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://外网IP:3389/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000

详细配置:https://github.com/seata/seata/blob/1.4.0/script/config-center/config.txt

  1. docker重启seata服务器:
docker restart 容器id
  1. 查看seata-server服务是否加载到了nacos上:

seata分布式事务中AT模式


AT事务参照2PC算法。AT事务的思路就是快照思路。
业务数据提交时,自动拦截所有的SQL,分别保存SQL对数据修改前后的快照。
如果分布式事务成功,那么直接删除快照中的记录。
如果分布式事务失败,那么事务回滚,根据日志数据自动产生用于补偿的逆向SQL。

springboot整合seata


应用seata实现分布式业务中的AT模式:
导入jar包:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>seata-spring-boot-starter</artifactId>
            <groupId>io.seata</groupId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>1.4.2</version>
</dependency>

业务数据库中添加undo_log表。

drop table `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

application.yml配置:

seata:
  enabled: true
  application-id: ${spring.application.name}
  enable-auto-data-source-proxy: true #是否开启数据源自动代理
  tx-service-group: admin-service-seata #需要和nacos上的service.vgroupMapping.admin-service-seata保持一致
  service:
    vgroup-mapping:
      admin-service-seata: default
    disable-global-transaction: false

  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: IP:8848
      group: "DEFAULT_GROUP"
      namespace: ""
      username: "nacos"
      password: "nacos"
  config:
    type: nacos
    nacos:
      server-addr: IP:8848
      group: "DEFAULT_GROUP"
      namespace: ""
      username: "nacos"
      password: "nacos"
  data-source-proxy-mode: AT

admin-service业务逻辑service实现:

@GlobalTransactional(rollbackFor = CommonException.class)
@Transactional(rollbackFor = CommonException.class)
@Override
public ResultModel<String> insertUser(@NonNull String id,
                                      @NonNull String username,
                                      @NonNull String password,
                                      @NonNull Integer roleId) throws CommonException {
    ResultModel<String> resultModel = new ResultModel<>();
    User user = new User();
    user.setId(id);
    user.setUsername(username);
    user.setPassword(password);
    user.setRoleId(roleId);
    userMapper.insert(user);

    ResultModel<String> teacherResult = teacherFeign.addTeacher(username, id);

    if (teacherResult.getSuccess()) {
        resultModel.setMsg("新增一条记录");
        resultModel.setTime(TimeUtil.getNowTime());
        resultModel.setRes(id);
        log.info("本地事务执行成功,新增一条记录 id = " + id);

        return resultModel;
    }

    resultModel.setSuccess(false);
    resultModel.setCode(500);
    resultModel.setMsg("新增一条记录失败");
    resultModel.setTime(TimeUtil.getNowTime());
    resultModel.setRes(id);
    return resultModel;
}

feign调用服务:

@FeignClient(name="teacher-service")
public interface TeacherFeign {
    @PostMapping("/teacher/add")
    ResultModel<String> addTeacher(@RequestParam("name") String name, @RequestHeader("userId") String userId) throws CommonException;
}

调用另一个系统teacher-service服务teacherFeign.addTeacher(username, id);

@Transactional(rollbackFor = CommonException.class)
@Override
public ResultModel<String> addTeacherInfo(String teacherId, String name) throws CommonException {
    Teacher teacher = new Teacher();
    teacher.setId(teacherId);
    teacher.setTeacherName(name);

    throw new CommonException("教师表中该id已存在");
}

controller层:

@PostMapping("/trans/add")
public ResultModel<String> addUser(@RequestParam("username") String username,
                                    @RequestHeader("password") String password,
                                    @RequestHeader("roleId") Integer roleId) {
    UserBO userBO = new UserBO();
    userBO.setUsername(username);
    userBO.setPassword(password);
    userBO.setRoleId(roleId);
    String id = UUID.randomUUID().toString().replace("-","");
    return userService.insertUser(id, username, password, roleId);
}

由于teacher-service服务抛出异常,所以admin-service服务虽然在edu_user新增杨幂记录,但是由于抛出异常,回滚了,所以edu_teacher没有记录。

上一篇下一篇

猜你喜欢

热点阅读