java面试

事务service调用异步线程bug

2022-07-20  本文已影响0人  Raral

事务service调用异步线程bug

当一个service更新一条数据,但是在异步方法里,查询数据时候,不是最新的数据的???

示例(普通开启线程-当前线程有睡眠):

    

    @Transactional
    @Override
    public void test() {
        log.info("【==当前线程事务开始==】");

        //更新操作
        boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
        if(update) {
            new Thread(() -> {
                GoodsPO byId = this.getById("111");
                //这个时候查询是没有提交事务的数据(也就是更新前的数据)
                log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
            }).start();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("【==当前线程事务提交==】");
        }

    }


 //打印
   /**
   【==当前线程事务开始==】
   【测试goodspo】 未提交事务的数据
    【==当前线程事务提交==】
   */

示例(普通开启线程-当前线程没有睡眠):

    

    @Transactional
    @Override
    public void test() {
        log.info("【==当前线程事务开始==】");

        //更新操作
        boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
        if(update) {
            new Thread(() -> {
                GoodsPO byId = this.getById("111");
                //这个时候查询是提交事务的数据(也就是更新后的数据)
                log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
            }).start();
            log.info("【==当前线程事务提交==】");
        }

    }


 //打印
   /**
   【==当前线程事务开始==】
    【==当前线程事务提交==】
    【测试goodspo】 已经提交事务的数据(更新后的数据)
   */

示例(springboot线程池@EnableAsync-@Async("executor")-):


    @Transactional
    @Override
    public void test() {
        log.info("【==当前线程事务开始==】");

        //更新操作
        boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
        if(update) {
            //异步线程池方法
            taskExecutor.asyncTest("111");
            log.info("【==当前线程事务提交==】");
        }

    }


 @Async("executor")
    public void asyncTest(String s) {
        GoodsPO byId = goodsMapper.selectById(s);
        log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
    }


 //打印
   /**
    【==当前线程事务开始==】
    【==当前线程事务提交==】
    【测试goodspo】 (本地环境是获取更新后的数据,其实线上环境是获取更新前的数据)
   */

(本地环境是获取更新后的数据,其实线上环境是获取更新前的数据)

解决上面问题

  1. 在当前异步方法,添加休眠时间
  2. 在service调取异步方法时,直接查询好数据,传给异步方法
@Transactional
    @Override
    public void test() {
        log.info("【==当前线程事务开始==】");

        //更新操作
        boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
        if(update) {
            //异步线程池方法
            taskExecutor.asyncTest("111");
            log.info("【==当前线程事务提交==】");
        }

    }

@Async("executor")
    public void asyncTest(String s) {

        try {
            long start = System.currentTimeMillis();
            log.info("线程开始休眠start{}",start);
            Thread.sleep(1000);
            log.info("线程结束休眠end{}",System.currentTimeMillis() - start);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        GoodsPO byId = goodsMapper.selectById(s);
        log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
    }
@Transactional
    @Override
    public void test() {
        log.info("【==当前线程事务开始==】");

        //更新操作
        boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
        if(update) {
            //异步线程池方法
            taskExecutor.asyncTest("111");
            log.info("【==当前线程事务提交==】");
        }

    }

@Async("executor")
    public void asyncTest(GoodsPO goodsPO) {
        log.info("【测试goodspo】{}", JacksonUtils.obj2json(goodsPO));
    }

注意:异步方法里的异常,不会影响外面事务的

进阶解决@Transactional 事务提交之后执行 @Async 修饰的异步方法
最近项目中遇到的问题:
俩个service方法, 方法A中调用方法B。
方法A核心业务涉及多张表的数据操作,事务采用注解:@Transactional(rollbackFor = Exception.class)。
方法B 比较耗时,为了不影响核心业务,方法B 用@Async注解,单独开启一个线程去异步执行。(方法B在另外一个类里边,不能和A在同一个类)。
出现的问题:
方法A的事务还没提交,方法B就执行了,导致方法B中查到的数据还是老数据。
当时想到的解决方案,方法A事务提交后再执行方法B


class A {
 
    @Autowired
    private B b;
    
    @Transactional
    public void updateA(..) {
        insert(..);
        update(..);
        b.updateB(..);
    }
 
}
    
 
class B {
 
    @Async
    public void updateB(..) {
        update(..)
    }
 

 @Resource
    private TaskExecutor taskExecutor;


    @Transactional
    @Override
    public void test() {
        log.info("【==当前线程事务开始==】");

        //更新操作
        boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
        if(update) {

            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void beforeCommit(boolean readOnly) {
                    log.info("【==当前事务提交前==】");
                }

                @Override
                public void afterCommit() {
                    log.info("【==当前事务提交后==】");
                    //异步线程池方法
                    taskExecutor.asyncTest("111");
                }
            });

            log.info("【==当前线程事务提交完成==】");
        }

    }

    @Async("executor")
    public void asyncTest(String s) {

        GoodsPO byId = goodsMapper.selectById(s);
        log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
    }



    //打印
    /**
        【==当前线程事务开始==】
         【==当前线程事务提交完成==】
         【==当前事务提交前==】
         【==当前事务提交后==】
         【测试goodspo】 (事务提交后的数据)
         
    */

进阶解决@Transactional 事务提交之后执行 异步方法时,获取不到事务提交后的数据(@TransactionalEventListener注解)
Spring事务监听机制—使用@TransactionalEventListener处理数据库事务提交成功后再执行操作

/**
事件
 * */
public class AcctypeEvent extends ApplicationEvent {

    private String id;

    public AcctypeEvent(Object source, String id) {
        super(source);
        this.id = id;
    }

    public String getId() {
        return id;
    }
}

定义事件监听器

/**
事件监听器
 * */
@Component
public class AcctytpeEventListener {
    
    @Resource
    private TaskExecutor taskExecutor;

    //监听劵批次事件
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    void updataAcctypeEvent(AcctypeEvent event) {
//        String id = event.getId();//商品id
        //查询最新的数据
        taskExecutor.asyncTest();

    }
    
}

发布事件

 @Autowired
    private ApplicationContext applicationContext;

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void test2() throws Exception {
        log.info("【主线程名称】:{}",Thread.currentThread().getName());
        //更新操作
        boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getApp, 66).eq(GoodsPO::getId, "0001059971"));

        //发布事件,处理异步任务(查询最新数据,发送短信成功后更新状态)
        applicationContext.publishEvent(new AcctypeEvent("我是和事务相关的事件,请事务提交后执行我","0001059971"));

    }
上一篇下一篇

猜你喜欢

热点阅读