seata实现分布式事务
2022-08-26 本文已影响0人
sunpy
seata是什么
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
seata官网:https://seata.io/zh-cn/docs/overview/what-is-seata.html
docker部署seata服务器
官网操作:https://seata.io/zh-cn/docs/ops/deploy-by-docker.html
- 下载镜像:
docker pull seataio/seata-server:1.4.2
- 启动容器:
docker run -d -e SEATA_IP=外网IP -e SEATA_PORT=8091 -p 8091:8091 --name seata-server seataio/seata-server:1.4.2
注意一定要指定地址为外网地址,这样nacos就可以获取了,否则nacos默认获取内网地址,这样客户端连接不上。
- 进入容器:
docker exec -it 容器id /bin/sh
- 修改配置文件:/seata-server/resources/registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "外网NACOS的IP:8848"
group = "DEFAULT_GROUP"
namespace = ""
cluster = "default"
username = "nacos"
password = "nacos"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "外网NACOS的IP:8848"
namespace = ""
group = "DEFAULT_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}
- 修改配置文件:/seata-server/resources/file.conf
## transaction log store, only used in seata-server
store {
## store mode: file、db、redis
mode = "db"
## rsa decryption public key
publicKey = ""
## file store property
db {
datasource = "druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.cj.jdbc.Driver"
## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
url = "jdbc:mysql://外网数据库IP:3389/seata?rewriteBatchedStatements=true"
user = "root"
password = "password"
minConn = 5
maxConn = 100
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}
}
- 建立seata数据库,导入配置sql文件:
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid_and_branch_id` (`xid` , `branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
- 在nacos中配置seata-server:
配置内容:
service.vgroupMapping.zhishu_group=default
service.enableDegrade=false
service.disableGlobalTransaction=false
store.mode=db
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://外网IP:3389/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=password
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
详细配置:https://github.com/seata/seata/blob/1.4.0/script/config-center/config.txt
- docker重启seata服务器:
docker restart 容器id
- 查看seata-server服务是否加载到了nacos上:
seata分布式事务中AT模式
AT事务参照2PC算法。AT事务的思路就是快照思路。
业务数据提交时,自动拦截所有的SQL,分别保存SQL对数据修改前后的快照。
如果分布式事务成功,那么直接删除快照中的记录。
如果分布式事务失败,那么事务回滚,根据日志数据自动产生用于补偿的逆向SQL。
springboot整合seata
应用seata实现分布式业务中的AT模式:
导入jar包:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<artifactId>seata-spring-boot-starter</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
业务数据库中添加undo_log表。
drop table `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
application.yml配置:
seata:
enabled: true
application-id: ${spring.application.name}
enable-auto-data-source-proxy: true #是否开启数据源自动代理
tx-service-group: admin-service-seata #需要和nacos上的service.vgroupMapping.admin-service-seata保持一致
service:
vgroup-mapping:
admin-service-seata: default
disable-global-transaction: false
registry:
type: nacos
nacos:
application: seata-server
server-addr: IP:8848
group: "DEFAULT_GROUP"
namespace: ""
username: "nacos"
password: "nacos"
config:
type: nacos
nacos:
server-addr: IP:8848
group: "DEFAULT_GROUP"
namespace: ""
username: "nacos"
password: "nacos"
data-source-proxy-mode: AT
admin-service业务逻辑service实现:
@GlobalTransactional(rollbackFor = CommonException.class)
@Transactional(rollbackFor = CommonException.class)
@Override
public ResultModel<String> insertUser(@NonNull String id,
@NonNull String username,
@NonNull String password,
@NonNull Integer roleId) throws CommonException {
ResultModel<String> resultModel = new ResultModel<>();
User user = new User();
user.setId(id);
user.setUsername(username);
user.setPassword(password);
user.setRoleId(roleId);
userMapper.insert(user);
ResultModel<String> teacherResult = teacherFeign.addTeacher(username, id);
if (teacherResult.getSuccess()) {
resultModel.setMsg("新增一条记录");
resultModel.setTime(TimeUtil.getNowTime());
resultModel.setRes(id);
log.info("本地事务执行成功,新增一条记录 id = " + id);
return resultModel;
}
resultModel.setSuccess(false);
resultModel.setCode(500);
resultModel.setMsg("新增一条记录失败");
resultModel.setTime(TimeUtil.getNowTime());
resultModel.setRes(id);
return resultModel;
}
feign调用服务:
@FeignClient(name="teacher-service")
public interface TeacherFeign {
@PostMapping("/teacher/add")
ResultModel<String> addTeacher(@RequestParam("name") String name, @RequestHeader("userId") String userId) throws CommonException;
}
调用另一个系统teacher-service服务teacherFeign.addTeacher(username, id);
@Transactional(rollbackFor = CommonException.class)
@Override
public ResultModel<String> addTeacherInfo(String teacherId, String name) throws CommonException {
Teacher teacher = new Teacher();
teacher.setId(teacherId);
teacher.setTeacherName(name);
throw new CommonException("教师表中该id已存在");
}
controller层:
@PostMapping("/trans/add")
public ResultModel<String> addUser(@RequestParam("username") String username,
@RequestHeader("password") String password,
@RequestHeader("roleId") Integer roleId) {
UserBO userBO = new UserBO();
userBO.setUsername(username);
userBO.setPassword(password);
userBO.setRoleId(roleId);
String id = UUID.randomUUID().toString().replace("-","");
return userService.insertUser(id, username, password, roleId);
}
由于teacher-service服务抛出异常,所以admin-service服务虽然在edu_user新增杨幂记录,但是由于抛出异常,回滚了,所以edu_teacher没有记录。