分布式事务

2021-01-08  本文已影响0人  后来丶_a24d

目录

目录.png

分布式事务解决方案


分布式事务应用场景

刚性事务xa

柔性事务

seata的at
最大努力送达
tcc
saga
saga服务端或者saga客户端重启或者宕机
  1. saga服务端出问题:
    本地消息补偿
  2. saga客户端宕机或者重启
    根据记录状态的消息进行相应补偿,要求客户端调用的被调用方业务接口幂等,补偿业务和saga服务,然后saga服务再进行补偿
  3. 普通业务场景下客户端宕机或者重启
    本地消息补偿,事件开始时就记录,定时任务扫描看状态进行补偿,同样要求被调用方业务接口幂等

基于seata分布式事务 vs servicecomb


分布式事务 servicecomb的saga原理

最开始调用时先经过sagastart注解
sagastart注解流程.png
@Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
    initializeOmegaContext();
    if(context.getAlphaMetas().isAkkaEnabled() && sagaStart.timeout()>0){
      SagaStartAnnotationProcessorTimeoutWrapper wrapper = new SagaStartAnnotationProcessorTimeoutWrapper(this.sagaStartAnnotationProcessor);
      return wrapper.apply(joinPoint,sagaStart,context);
    }else{
      SagaStartAnnotationProcessorWrapper wrapper = new SagaStartAnnotationProcessorWrapper(this.sagaStartAnnotationProcessor);
      return wrapper.apply(joinPoint,sagaStart,context);
    }
}

private void initializeOmegaContext() {
    context.setLocalTxId(context.newGlobalTxId());
}
public AlphaResponse preIntercept(int timeout) {
    try {
      return sender
          .send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
    } catch (OmegaException e) {
      throw new TransactionalException(e.getMessage(), e.getCause());
    }
}
SagaStartAnnotationProcessor远程调用alpha步骤1.2.3
远程调用alpha步骤1.2.3.png
远程调用alpha流程图.png
@Override
public AlphaResponse send(TxEvent event) {
    do {
      final SagaMessageSender messageSender = pickMessageSender();
      Optional<AlphaResponse> response = doGrpcSend(messageSender, event, new SenderExecutor<TxEvent>() {
        @Override
        public AlphaResponse apply(TxEvent event) {
          return messageSender.send(event);
        }
      });
      if (response.isPresent()) return response.get();
    } while (!Thread.currentThread().isInterrupted());

    throw new OmegaException("Failed to send event " + event + " due to interruption");
}

LoadBalanceSenderAdapter
// 轮询做负载
public <T> T pickMessageSender() {
    return (T) senderPicker.pick(loadContext.getSenders(),
        loadContext.getGrpcOnErrorHandler().getGrpcRetryContext().getDefaultMessageSender());
}
// 统一模板方法,回调子类自己的方法
public <T> Optional<AlphaResponse> doGrpcSend(MessageSender messageSender, T event, SenderExecutor<T> executor) {
    AlphaResponse response = null;
    try {
      long startTime = System.nanoTime();
      response = executor.apply(event);
      loadContext.getSenders().put(messageSender, System.nanoTime() - startTime);
    } catch (OmegaException e) {
      throw e;
    } catch (Exception e) {
      LOG.error("Retry sending event {} due to failure", event, e);
      loadContext.getSenders().put(messageSender, Long.MAX_VALUE);
    }
    return Optional.fromNullable(response);
}
booking调用car业务代码时@Compensable注解
booking调用car业务代码.png
@Around("execution(@org.apache.servicecomb.pack.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
    Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
    // just check if we need to setup the transaction context information first
    TransactionContext transactionContext = extractTransactionContext(joinPoint.getArgs());
    if (transactionContext != null) {
      populateOmegaContext(context, transactionContext);
    }
    // SCB-1011 Need to check if the globalTxId transaction is null to avoid the message sending failure
    if (context.globalTxId() == null) {
      throw new OmegaException("Cannot find the globalTxId from OmegaContext. Please using @SagaStart to start a global transaction.");
    }
    String localTxId = context.localTxId();
    context.newLocalTxId();
    LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
  
    int forwardRetries = compensable.forwardRetries();
    // 可以有前向后向补偿,不过一般用default
    RecoveryPolicy recoveryPolicy = RecoveryPolicyFactory.getRecoveryPolicy(forwardRetries);
    try {
      return recoveryPolicy.apply(joinPoint, compensable, interceptor, context, localTxId, forwardRetries);
    } finally {
      context.setLocalTxId(localTxId);
      LOG.debug("Restored context back to {}", context);
    }
}
alpha集群开启时

基于servicecomb的saga思考

一: 调用方a调用服务器b,此时如果b服务器宕机导致没有给事务协调器alpha发end事件如果处理?或者调用方宕机怎么办?
1. 依靠补偿接口有处理无效补偿事件的能力。执行分支事务与RPC发送结束事件本来就无法保证原子性。所以这里直接进行补偿接口调用,需要补偿接口写好。
2. 分布式事务发起方宕机没结束事件没关系,alpha扫描发现事务没结束事件可以直接考虑进行补偿了。反正本来响应也是异常返回给用户。异常事件时会等30秒左右再调用各个补偿接口防止有子事务为完成。
3. 服务发起方到事务协调器这块的网络有可能出现异常, 重试不行就抛异常,分布式事务还是使调用方耦合了事务协调器

二: 可配置化SagaStartAspect.sender(SagaMessageSender是接口有不同实现)。OmegaSpringConfig -> TransactionAspectConfig -> 此构造方法,springboot回去OmegaSpringConfig找实现SagaMessageSender的bean最终是sagaLoadBalanceSender,这样每个项目可根据需求实现不同策略

@Aspect
@Order(value = 100)
public class SagaStartAspect {

  private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;

  private final OmegaContext context;

  public SagaStartAspect(SagaMessageSender sender, OmegaContext context) {
    this.context = context;
    this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
  }
}

@Bean
SagaMessageSender sagaLoadBalanceSender(@Qualifier("sagaLoadContext") LoadBalanceContext loadBalanceSenderContext) {
    final SagaMessageSender sagaMessageSender = new SagaLoadBalanceSender(loadBalanceSenderContext, new FastestSender());
    sagaMessageSender.onConnected();
    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
      @Override
      public void run() {
        sagaMessageSender.onDisconnected();
        sagaMessageSender.close();
      }
    }));
    return sagaMessageSender;
}

@Bean
SagaStartAspect sagaStartAspect(SagaMessageSender sender, OmegaContext context) {
    return new SagaStartAspect(sender, context);
}

参考文章

  1. 分布式事务看这一篇就够了
  2. 分布式事务:2PC、3PC、SAGA、TCC
  3. Seata 分布式事务实践和开源详解 | GIAC 实录
  4. at与xa区别
  5. saga使用场景
  6. 可靠消息最终一致性(本地消息表)
  7. 云原生时代分布式事务
  8. 两天看完分布式事务
  9. 带你读透 SEATA 的 AT 模式
  10. 分布式事务 Seata AT模式原理与实战
  11. serviccomb-omega源码解读
  12. servicecomb-saga各个issue,优化
  13. Akka中文指南
  14. akka分片
  15. akka分片以及分片故障自愈
  16. servicecomb文档
上一篇下一篇

猜你喜欢

热点阅读