强一致性分布式事务框架 raincat

2020-07-29  本文已影响0人  Ace_b90f

raincat项目地址

源码分析

本地事务部分依赖于spring事务

@TxTransactional注解和spring自带的Transactional注解没联系,是手动调用TransactionManager的一些方法来使用spring的事务的。具体是在拦截@TxTransactional注解时,调用getTransaction方法中,其内部调用doGetTransaction时,若使用jdbc会将AutoCommit置为false,执行切面方法,然后阻塞。这里就是两阶段提交中的第一阶段,这时候并没有真正的提交数据库,需要等待协调者的反馈。然后再根据协调者即raincat-manager返回的状态,手动调用commit或者rollback方法来使用spring的事务。

事务的协调者是raincat-manager

raincat-manager也作为EurekaServer兼职了服务注册的作用。

事务协调者和事务参与者通信是通过netty

事务参与者在发起事务、加入事务组、预提交、事务完成、回滚等操作时都会通过netty-handler发送心跳数据。协调者即raincat-manager在收到心跳包后会判断行为来进行协调的操作,并回发心跳来决定参与者的下一步操作。

事务组的管理部分全部在协调者即raincat-manager。比如说,事务发起者在执行本地业务前会发送CREATE_GROUP的心跳给协调者,协调者在收到心跳包后会创建事务组,若是创建失败则会通知事务发起者不必执行本地业务,若是创建成功则通知事务发起者可以进行下一步操作。加入事务组之类的逻辑同理。

协调者根据事务组来协调事务所有的参与者

事务发起者会在业务代码执行前生成一个事务组Id,将其放入ThreadLocal中,在使用rpc时,因为是同一个线程,可以从ThreadLocal中获取事务组Id,再将事务组Id放进请求头,调用其他服务时在rpc的提供者即事务的参与者就可以获取同一个事务组Id,这样就可以将发起者和参与者放进同一个事务组,事务管理器会协调事务组的整体提交。

分布式事务发起者在使用rpc组件调用其他服务时,会将生成的事务Id放在请求头中,主要代码是在RpcMediator类中,通过transmit方法添加,通过aquire方法获取。不同的rpc放入的位置不同,若是feign就是通过RequestInterceprot放在requestTemplate::header,若是dubbo就是通过Filter放在RpcContext.getContext()::setAttachment。在服务提供端会在调用业务代码前从请求头中取出,具体代码是在各个rpc对TxTransactionInterceptor的实现中。

通过是否存在txGroupId来判断当前服务是事务的发起者还是事务的参与者

因为txGroupId是事务发起者创建的,因此在请求事务发起者时,请求头中不存在txGroupId,而发起者在调用参与者时将txGroupId放进了请求头。具体的判断代码在TxTransactionFactoryServiceImpl.factoryOf(txTransactionInfo)

参与者使用ReentrantLock和Condition来阻塞自己

参与者在发送心跳包后会阻塞自己,因为需要等待协调者的反馈。得到协调者的反馈后会根据回写的心跳包中的key找到对应的锁,解锁后继续执行。锁的实现代码是在BlockTask类中,锁的使用代码主要是在NettyClientMessageHandler.sendTxManagerMessage(heartBeat)

事务参与者的提交和回滚需要获取到对应的Channel

事务发起者在发起事务或事务参与者在加入事务组时会将自己的remoteAddress保存起来。事务协调者在发起提交或者回滚时会根据上面保存的remoteAddress找到对应的Channel,代码在SocketManager.getChannelByModelName(name),找到对应的Channel后就会向对应的参与者的Channel发送心跳包。

拦截器部分

因为要在调用业务方法前找到当前操作所在的事务组,并将当前操作加入事务组,因此在最外面对于TxTransaction的Aspect拦截部分就做了rpc的抽象,要根据特定的rpc找到请求头中的事务组。

不同的rpc实现通过spi调用。

要分清客户端和服务端

客户端只连一个服务端,因此在NetyClientMessageHandler中对当前连接的服务端的ctx做了保存,在和服务端主动通信时直接使用这个ctx来write。

源码疑问

org.dromara.raincat.core.service.impl.TxManagerLocator中存储对象的部分使用的是AtomicReference。这里的修改部分又加了锁,如果不用原子类,直接用volatile不可以吗。

上一篇下一篇

猜你喜欢

热点阅读