记录一次并发引起的分布式锁问题

2023-04-06  本文已影响0人  多关心老人

项目中使用了线程池处理设备上报的数据,为了保证数据不乱序,每个设备的数据用固定的线程处理。

for (int i = 0; i < executorCount; i++) {
//            必须单线程运行
            ExecutorService executor = new ThreadPoolExecutor(1, 1, 10, TimeUnit.MINUTES, new LinkedBlockingDeque<>(1024),
                    new NamedThreadFactory("energy-efficiency-g" + i, null, true, (Thread t, Throwable e) -> {
                        log.error("处理XXX发生异常", e);
                    }),
                    new ThreadPoolExecutor.DiscardOldestPolicy(){
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                            log.error("线程池已满,触发拒绝策略,请考虑扩大线程池或者使用集群");
                            super.rejectedExecution(r, e);
                        }
                    });
            executors[i] = executor;
        }

由于每个ThreadPoolExecutor中的线程数就1个,因此线程名应该是"energy-efficiency-g1-1"这种,后缀名一定是1,在项目运行过程中,发现后缀不是1而是其他数字,当时并未注意。

现在在线程处理业务的时候加了分布式锁,竟然加锁失败。锁的key是设备的mac,如果按照理想的情况,每个mac都是固定线程在处理,因此每个设备的数据都会在queue里排队等待处理,不会出现一个设备的两包数据争抢一把锁,在debug的时候,发现一直在创建线程即下面的代码一直触发

new NamedThreadFactory("energy-efficiency-g" + i, null, true, (Thread t, Throwable e) -> {
                        log.error("处理XXX发生异常", e);
                    }),

至于为什么没发现报错日志,是因为这个log输出到另一个文件了,没及时发现。

回到问题本身:为什么会一直创建线程呢?


image.png

如果队列中有任务或者没线程没闲置很久,那么就一直在while循环里,那可能就是task.run()抛异常了,打上断点确实抛异常了,原来是在task中获取分布式锁后设置看门狗的时候因为代码版本原因报错了,这里throw x导致跳出while循环,进入后面的


image.png
在这里又会创建一个新的Worker(Thread),而新的Thread又会去queue里取任务去执行去加分布式锁,这个时候原先的线程还没执行完毕,锁也没释放,因此新线程再获取分布式锁就失败了。
上一篇下一篇

猜你喜欢

热点阅读