SpringCloud用Seata处理分布式事务
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
- TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
Seata服务端部署(TC)
为了简化过程,部署单机版。高可用部署参考
- 1、部署nacos
(跳过)
[图片上传失败...(image-144e38-1654589365211)]
- 2、添加nacos配置
dataId: seataServer.properties,Group: SEATA_GROUP
#Transport configuration, for client and server
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none
#Transaction routing rules configuration, only for the client
service.vgroupMapping.default_tx_group=dev
#If you use a registry, you can ignore it
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
#Transaction rule configuration, only for the client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=kryo
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h
#Log rule configuration, for client and server
log.exceptionRate=100
#Transaction storage configuration, only for the server. The file, DB, and redis configuration values are optional.
store.mode=db
store.lock.mode=db
store.session.mode=db
#Used for password encryption
store.publicKey=
#These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block.
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.cj.jdbc.Driver
store.db.url=jdbc:mysql://192.168.137.1:13306/seata?serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=utf8
store.db.user=seata
store.db.password=seata
store.db.minConn=2
store.db.maxConn=8
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
#Transaction rule configuration, only for the server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false
#Metrics configuration, only for the server
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
service.vgroupMapping.default_tx_group=dev
(default_tx_group事务分组,dev是tc集群名称)
client.undo.logSerialization=kryo(默认是jackson在1.4.2版本及以下版本会报错,并且json体积也会更大)
- 3、下载Seata releases
- 4、解压后目录结构
LICENSE bin conf lib logs
- 5、修改conf/registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
# nacos 作为注册中心
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "192.168.137.1:8848"
group = "SEATA_GROUP"
namespace = "seata"
cluster = "dev"
username = "nacos"
password = "nacos"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
# nacos 作为配置中心
type = "nacos"
nacos {
serverAddr = "192.168.137.1:8848"
namespace = "seata"
group = "SEATA_GROUP"
username = "nacos"
password = "nacos"
dataId = "seataServer.properties"
}
}
cluster = "dev" 和上面service.vgroupMapping.default_tx_group=dev对应
- 6、启动Seata
# 事务日志使用db存储
./bin/seata-server.sh -m db
14:16:22.600 INFO --- [ main] com.alibaba.druid.pool.DruidDataSource : {dataSource-1} inited
14:16:22.987 INFO --- [ main] i.s.core.rpc.netty.NettyServerBootstrap : Server started, listen port: 8091
参数 | 全写 | 作用 | 备注 |
---|---|---|---|
-h | --host | 指定在注册中心注册的 IP | 不指定时获取当前的 IP,外部访问部署在云环境和容器中的 server 建议指定 |
-p | --port | 指定 server 启动的端口 | 默认为 8091 |
-m | --storeMode | 事务日志存储方式 | 支持file ,db ,redis ,默认为 file 注:redis需seata-server 1.3版本及以上 |
-n | --serverNode | 用于指定seata-server节点ID | 如 1 ,2 ,3 ..., 默认为 1
|
-e | --seataEnv | 指定 seata-server 运行环境 | 如 dev , test 等, 服务启动时会使用 registry-dev.conf 这样的配置 |
项目集成Seata
- 1、 添加依赖
implementation 'com.alibaba.cloud:spring-cloud-starter-alibaba-seata'
implementation 'com.esotericsoftware:kryo'
implementation 'com.esotericsoftware.kryo:kryo'
implementation 'de.javakaffee:kryo-serializers'
- 2、添加配置
spring:
cloud:
alibaba:
seata:
tx-service-group: default_tx_group
seata:
enabled: true
registry:
nacos:
cluster: dev
server-addr: 192.168.137.1:8848
group: SEATA_GROUP
username: nacos
password: nacos
namespace: seata
type: nacos
config:
type: nacos
nacos:
server-addr: 192.168.137.1:8848
group : SEATA_GROUP
namespace: seata
dataId: seataServer.properties
username: nacos
password: nacos
service:
vgroup-mapping:
default_tx_group: dev
tx-service-group:
default_tx_group
和service.vgroupMapping.default_tx_group
要一致,且service.vgroup-mapping.default_tx_group:dev
(为TC集群名称)
- 3、编码
TM事务发起方
@GlobalTransactional(timeoutMills = 30000, name = "seataTest", rollbackFor = Exception.class)
@Override
public void seataTest() {
final SysLog sysLog = new SysLog();
sysLog.setId(IdUtils.id());
sysLog.setType(SysLogTypeEnum.INFO.getType());
sysLog.setClassName("SysLogServiceImpl");
sysLog.setMethodName("seataTest");
sysLog.setRequestUri("/seata");
sysLog.setParams("");
sysLog.setRequestIp("");
save(sysLog);
if (communityClient.test().isFail()) {
throw ServiceException.of("client fail");
}
}
只需要增加
@GlobalTransactional
注解
@FeignClient(value = "sell", path = "/sellCommunity", fallbackFactory = SellCommunityClient.Fallback.class)
public interface SellCommunityClient {
/**
* 测试分布式事务
*/
@PostMapping("/test")
Result<Void> test();
@Component
@Slf4j
class Fallback implements FallbackFactory<SellCommunityClient> {
@Override
public SellCommunityClient create(Throwable cause) {
log.error("异常原因:{}", cause.getMessage(), cause);
return () -> {
log.info("------test Fallback------");
return Result.fail("Fallback");
};
}
}
}
RM分支事务参与方
@RestController
@RequestMapping("/sellCommunity")
@RequiredArgsConstructor
@Getter
public class SellCommunityController extends BaseController<SellCommunity, SellCommunityService> {
private final SellCommunityService service;
@PostMapping("/test")
public Result<Void> test() {
service.testSeata();
return Result.success();
}
}
@Service
public class SellCommunityServiceImpl extends BaseServiceImpl<SellCommunityMapper, SellCommunity> implements SellCommunityService {
@Override
@Transactional(rollbackFor = Exception.class)
public void testSeata() {
final SellCommunity community = new SellCommunity();
community.setName("test");
community.setAddress("beijing");
community.setHouseNumber(1000);
insert(community);
// System.out.println(1/0);
}
}
注意:
1、项目有全局异常处理的,默认是不会回滚的,需要自己根据状态码处理
2、使用fallbackFactory降级,默认是不会回滚的,同样需要在fallbackFactory修改状态码
// 状态码判断
public boolean isFail(){
return this.code != ResultEnum.SUCCESS.getCode();
}
// 根据状态码抛出异常
if (communityClient.test().isFail()) {
throw ServiceException.of("client fail");
}
作者:realmadrid_juejin
链接:https://juejin.cn/post/7097113972616200223
来源:稀土掘金