SpringBoot事务提交之后的操作保证

2020-12-20  本文已影响0人  田真的架构人生

业务场景:
在一个事务操作中,当数据入库之后,继续做其他异步或同步操作,如消息通知、远程接口调用等。

存在的问题:

  1. 事务原子性不能保证:如果出现事务回滚,则数据入库失败,然而异步操作却不能回滚,继续执行,这就会出现与业务预期不一致的结果(如数据入库失败,但是消息通知则照常触发);
  2. 数据正确性无法保证:如果异步操作需要反查数据库上一步入库的结果,而上一步的事务由于数据库压力或IO等原因导致事务提交延迟,这时异步操作去数据库里查询数据就会失败;

解决方案:
这就要求我们保证事务的原子性,数据库入库失败,那异步操作也不能执行;另外还要保证在异步操作执行前事务一定要是已提交状态。

  1. 使用TransactionSynchronizationManager保证异步操作只在事务正确commit之后才执行。这也是个人较为推荐的一种做法。
    定义一个组件,该组件首先检查当前上下文是否开启事务,如果存在事务,当事务正确commit后,执行由调用者传过来的异步执行逻辑。
/**
 * 事务提交后的处理器,action执行严格依赖调用方的事务提交
 * 如果调用方没有事务,不执行;
 * 如果调用方事务回滚,不执行;
 * action异常不影响调用方事务提交;
 *
 * @Author tz
 * @Date 2020/12/18 14:12
 * @Version 1.0
 */
@Component
public class TransactionCommitHandler {
    public void handle(Runnable action){
        if (TransactionSynchronizationManager.isActualTransactionActive()){
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                  //具体的异步操作
                  action.run();
                }
            });
        }
    }
}

调用方:

/**
     * 保存工单及发送消息
     */
    @Transactional(isolation = Isolation.REPEATABLE_READ, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
    public void submitOrder(Integer ottStatus, Integer mppStatus, AssetPlaylistDto newPl) throws Exception {
        //... 其它操作

        AssetPlaylistOrders order = this.constructOrder(ottStatus, mppStatus, newPl);
        //保存工单
        this.save(order);

        transactionCommitHandler.handle(() -> {
            //发送复审mq
            rabbitMqService.sendOrderMsg(order.getOrderId(), order.getPlId(), AssetTypeEnum.AUTO_PLAY);
        });
    }
  1. Spring4.2之后,可以使用TransactionalEventListener监听事务提交,并在调用方发送event。这种方式需要维护过多的事件及事件处理器,可维护性较差,相对而言上面第一种方案的函数式输入会简单一点。

业务实现:

@Service
@Slf4j
public class UserService extends ServiceImpl<UserDao, User> {
    @Autowired
    ApplicationEventPublisher eventPublisher;
    
    @Transactional
    public void add(User user){
        super.save(user);
        eventPublisher.publishEvent(new UserAddEvent(user.getId()));
    }
}

自定义事件:

@Data
public class UserAddEvent extends ApplicationEvent {

    private Integer userId;

    public UserAddEvent(Integer userId) {
        this.userId = userId;
    }
}

监听器实现:

@Slf4j
@Component
public class UserListener {

    @Autowired
    EmailService emailService;

    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, classes = UserAddEvent.class)
    public void onUserAddEvent(UserAddEvent event) {
        emailService.sendEmail(event.getUserId());
    }
}

还有一些其它个人觉得不太靠谱的方式,如去掉整个事务,甚至手动延迟异步操作,这些不论是从事务的ACID上,还是软件鲁棒性上来讲,都不是很好的解决方案。

上一篇下一篇

猜你喜欢

热点阅读