记录一次并发引起的分布式锁问题
2023-04-06 本文已影响0人
多关心老人
项目中使用了线程池处理设备上报的数据,为了保证数据不乱序,每个设备的数据用固定的线程处理。
for (int i = 0; i < executorCount; i++) {
// 必须单线程运行
ExecutorService executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.MINUTES, new LinkedBlockingDeque<>(1024),
new NamedThreadFactory("energy-efficiency-g" + i, null, true, (Thread t, Throwable e) -> {
log.error("处理XXX发生异常", e);
}),
new ThreadPoolExecutor.DiscardOldestPolicy(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
log.error("线程池已满,触发拒绝策略,请考虑扩大线程池或者使用集群");
super.rejectedExecution(r, e);
}
});
executors[i] = executor;
}
由于每个ThreadPoolExecutor中的线程数就1个,因此线程名应该是"energy-efficiency-g1-1"这种,后缀名一定是1,在项目运行过程中,发现后缀不是1而是其他数字,当时并未注意。
现在在线程处理业务的时候加了分布式锁,竟然加锁失败。锁的key是设备的mac,如果按照理想的情况,每个mac都是固定线程在处理,因此每个设备的数据都会在queue里排队等待处理,不会出现一个设备的两包数据争抢一把锁,在debug的时候,发现一直在创建线程即下面的代码一直触发
new NamedThreadFactory("energy-efficiency-g" + i, null, true, (Thread t, Throwable e) -> {
log.error("处理XXX发生异常", e);
}),
至于为什么没发现报错日志,是因为这个log输出到另一个文件了,没及时发现。
回到问题本身:为什么会一直创建线程呢?
image.png
如果队列中有任务或者没线程没闲置很久,那么就一直在while循环里,那可能就是task.run()抛异常了,打上断点确实抛异常了,原来是在task中获取分布式锁后设置看门狗的时候因为代码版本原因报错了,这里throw x导致跳出while循环,进入后面的
image.png
在这里又会创建一个新的Worker(Thread),而新的Thread又会去queue里取任务去执行去加分布式锁,这个时候原先的线程还没执行完毕,锁也没释放,因此新线程再获取分布式锁就失败了。