事务service调用异步线程bug
事务service调用异步线程bug
当一个service更新一条数据,但是在异步方法里,查询数据时候,不是最新的数据的???
示例(普通开启线程-当前线程有睡眠):
@Transactional
@Override
public void test() {
log.info("【==当前线程事务开始==】");
//更新操作
boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
if(update) {
new Thread(() -> {
GoodsPO byId = this.getById("111");
//这个时候查询是没有提交事务的数据(也就是更新前的数据)
log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("【==当前线程事务提交==】");
}
}
//打印
/**
【==当前线程事务开始==】
【测试goodspo】 未提交事务的数据
【==当前线程事务提交==】
*/
示例(普通开启线程-当前线程没有睡眠):
@Transactional
@Override
public void test() {
log.info("【==当前线程事务开始==】");
//更新操作
boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
if(update) {
new Thread(() -> {
GoodsPO byId = this.getById("111");
//这个时候查询是提交事务的数据(也就是更新后的数据)
log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
}).start();
log.info("【==当前线程事务提交==】");
}
}
//打印
/**
【==当前线程事务开始==】
【==当前线程事务提交==】
【测试goodspo】 已经提交事务的数据(更新后的数据)
*/
示例(springboot线程池@EnableAsync-@Async("executor")-):
@Transactional
@Override
public void test() {
log.info("【==当前线程事务开始==】");
//更新操作
boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
if(update) {
//异步线程池方法
taskExecutor.asyncTest("111");
log.info("【==当前线程事务提交==】");
}
}
@Async("executor")
public void asyncTest(String s) {
GoodsPO byId = goodsMapper.selectById(s);
log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
}
//打印
/**
【==当前线程事务开始==】
【==当前线程事务提交==】
【测试goodspo】 (本地环境是获取更新后的数据,其实线上环境是获取更新前的数据)
*/
(本地环境是获取更新后的数据,其实线上环境是获取更新前的数据)
解决上面问题
- 在当前异步方法,添加休眠时间
- 在service调取异步方法时,直接查询好数据,传给异步方法
@Transactional
@Override
public void test() {
log.info("【==当前线程事务开始==】");
//更新操作
boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
if(update) {
//异步线程池方法
taskExecutor.asyncTest("111");
log.info("【==当前线程事务提交==】");
}
}
@Async("executor")
public void asyncTest(String s) {
try {
long start = System.currentTimeMillis();
log.info("线程开始休眠start{}",start);
Thread.sleep(1000);
log.info("线程结束休眠end{}",System.currentTimeMillis() - start);
} catch (InterruptedException e) {
e.printStackTrace();
}
GoodsPO byId = goodsMapper.selectById(s);
log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
}
@Transactional
@Override
public void test() {
log.info("【==当前线程事务开始==】");
//更新操作
boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
if(update) {
//异步线程池方法
taskExecutor.asyncTest("111");
log.info("【==当前线程事务提交==】");
}
}
@Async("executor")
public void asyncTest(GoodsPO goodsPO) {
log.info("【测试goodspo】{}", JacksonUtils.obj2json(goodsPO));
}
注意:异步方法里的异常,不会影响外面事务的
进阶解决@Transactional 事务提交之后执行 @Async 修饰的异步方法
最近项目中遇到的问题:
俩个service方法, 方法A中调用方法B。
方法A核心业务涉及多张表的数据操作,事务采用注解:@Transactional(rollbackFor = Exception.class)。
方法B 比较耗时,为了不影响核心业务,方法B 用@Async注解,单独开启一个线程去异步执行。(方法B在另外一个类里边,不能和A在同一个类)。
出现的问题:
方法A的事务还没提交,方法B就执行了,导致方法B中查到的数据还是老数据。
当时想到的解决方案,方法A事务提交后再执行方法B
- 问题代码:
class A {
@Autowired
private B b;
@Transactional
public void updateA(..) {
insert(..);
update(..);
b.updateB(..);
}
}
class B {
@Async
public void updateB(..) {
update(..)
}
-
注意
方法A和方法B假如在同一个类中,则方法B的@Async注解会失效:
首先先解释下@Transactional注解失效原因:Spring在扫描bean的时候会扫描方法是否包含@Transactional注解,如果包含,spring会为这个bean动态生成一个子类(代理类proxy),代理类是继承原来的bean的。
此时,当前这个注解的方法被调用时候,实际上是由代理类调用的,代理类在调用之前就会启动Transaction事务。然而,如果这个方法被同一个类中的其他方法调用的,那么该方法的调用并没有通过代理类,而是直接通过原来bean直接调用,所以就不会启动Transaction,我们看到的现象就是 @Transactional 注解无效。
@Transactional和@Async注解实现都是基于spirng的Aop,而AOP实现是基于动态代理实现的,故Async 失效的原理和原理是一样的。(都是因为同一个类其他方法调用时,没有通过代理类调用,所以注解失效)
-
代码
@Resource
private TaskExecutor taskExecutor;
@Transactional
@Override
public void test() {
log.info("【==当前线程事务开始==】");
//更新操作
boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getIsDelete, 1).eq(GoodsPO::getId, "111"));
if(update) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void beforeCommit(boolean readOnly) {
log.info("【==当前事务提交前==】");
}
@Override
public void afterCommit() {
log.info("【==当前事务提交后==】");
//异步线程池方法
taskExecutor.asyncTest("111");
}
});
log.info("【==当前线程事务提交完成==】");
}
}
@Async("executor")
public void asyncTest(String s) {
GoodsPO byId = goodsMapper.selectById(s);
log.info("【测试goodspo】{}", JacksonUtils.obj2json(byId));
}
//打印
/**
【==当前线程事务开始==】
【==当前线程事务提交完成==】
【==当前事务提交前==】
【==当前事务提交后==】
【测试goodspo】 (事务提交后的数据)
*/
进阶解决@Transactional 事务提交之后执行 异步方法时,获取不到事务提交后的数据(@TransactionalEventListener注解)
Spring事务监听机制—使用@TransactionalEventListener处理数据库事务提交成功后再执行操作
- 为什么使用
在项目中,往往需要执行数据库操作后,发送消息或事件来异步调** * 用其他组件执行相应的操作,例如:
用户注册后发送激活码;
配置修改后发送更新事件等。
但是,数据库的操作如果还未完成,此时异步调用的方法查询数据库发现没有数据,这就会出现问题。 - 为了解决上述问题,Spring为我们提供了两种方式:
(1) @TransactionalEventListener注解(订阅发布设计模式)
(2) 事务同步管理器TransactionSynchronizationManager(上述已介绍)
以便我们可以在事务提交后再触发某一事件。
- 代码
定义事件
/**
事件
* */
public class AcctypeEvent extends ApplicationEvent {
private String id;
public AcctypeEvent(Object source, String id) {
super(source);
this.id = id;
}
public String getId() {
return id;
}
}
定义事件监听器
/**
事件监听器
* */
@Component
public class AcctytpeEventListener {
@Resource
private TaskExecutor taskExecutor;
//监听劵批次事件
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
void updataAcctypeEvent(AcctypeEvent event) {
// String id = event.getId();//商品id
//查询最新的数据
taskExecutor.asyncTest();
}
}
发布事件
@Autowired
private ApplicationContext applicationContext;
@Transactional(rollbackFor = Exception.class)
@Override
public void test2() throws Exception {
log.info("【主线程名称】:{}",Thread.currentThread().getName());
//更新操作
boolean update = this.update(Wrappers.<GoodsPO>lambdaUpdate().set(GoodsPO::getApp, 66).eq(GoodsPO::getId, "0001059971"));
//发布事件,处理异步任务(查询最新数据,发送短信成功后更新状态)
applicationContext.publishEvent(new AcctypeEvent("我是和事务相关的事件,请事务提交后执行我","0001059971"));
}