从Spring框架中的一个BUG来分析锁使用的问题

2021-04-08  本文已影响0人  郑印

从Spring框架中的一个BUG来分析锁使用的问题

锁的使用是一个非常细致的事情,如果使用不当会导致系统性能急剧下降,下面我们通过分析Spring框架中一个锁使用错误的一个BUG来总结一下Lock这部分的知识。 关于BUG的细节,可以Google " MimeTypeUtils LRU cache " 关键词找到。

本文将关键的代码摘出。

本文源码地址

测试有问题版本的 ConcurrentLruCache

注意: metrics 属性及其逻辑,是为了收集指标加入的,原本Spring框架代码中没有这段

 /**
     * 并发的Lur Cache
     * @param <K>
     * @param <V>
     */
    private static class ConcurrentLruCache<K, V> implements LruCache<K, V>{

        private final int maxSize;

        private final ConcurrentLinkedQueue<K> queue = new ConcurrentLinkedQueue<>();

        private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();

        private final ReadWriteLock lock = new ReentrantReadWriteLock();

        private final Function<K, V> generator;

        private final List<Long> metrics;

        public ConcurrentLruCache(int maxSize, Function<K, V> generator , List<Long> metrics) {
            Assert.isTrue(maxSize > 0, "LRU max size should be positive");
            Assert.notNull(generator, "Generator function should not be null");
            this.maxSize = maxSize;
            this.generator = generator;
            this.metrics = metrics;
        }

        @Override
        public V get(K key) {
            this.lock.readLock().lock();
            try {
                if (this.queue.size() < this.maxSize / 2) {
                    V cached = this.cache.get(key);
                    if (cached != null) {
                        return cached;
                    }
                }
                else if (this.queue.remove(key)) {
                    this.queue.add(key);
                    return this.cache.get(key);
                }
            }
            finally {
                this.lock.readLock().unlock();
            }
            this.lock.writeLock().lock();
            long s = System.currentTimeMillis();
            try {


                // retrying in case of concurrent reads on the same key
                if (this.queue.remove(key)) {
                    this.queue.add(key);
                    return this.cache.get(key);
                }
                if (this.queue.size() == this.maxSize) {
                    K leastUsed = this.queue.poll();
                    if (leastUsed != null) {
                        this.cache.remove(leastUsed);
                    }
                }
                V value = this.generator.apply(key);
                this.queue.add(key);
                this.cache.put(key, value);
                return value;
            }
            finally {
                metrics.add(System.currentTimeMillis()-s);
                this.lock.writeLock().unlock();
            }
        }
    }

我们通过 benchmarkLruCache 的程序进行测试,这个程序也会测试修改后的版本。

    /**
     *
     * @param loop 循环次数
     * @param sample 模拟的VALUE不同种类数
     * @param lruCache 测试的Lru实现
     * @return
     */
    private static List<Long> benchmarkLruCache(int loop , int sample , LruCache<String,Object> lruCache){
        List<Long> times = Collections.synchronizedList(new ArrayList<>(loop));
        CountDownLatch countDownLatch = new CountDownLatch(loop);
        ExecutorService executor = Executors.newFixedThreadPool(100);
        IntStream.rangeClosed(1,loop)
                .forEach(i ->{
                    executor.execute(() -> {
                        long s = System.currentTimeMillis();
                        IntStream.rangeClosed(1,sample)
                                .forEach(n -> {
                                    String v = n+"";
                                    if(!lruCache.get(v).equals(v .hashCode())){
                                        throw new RuntimeException("结果错误");
                                    }
                                });
                        times.add((System.currentTimeMillis() - s));
                        countDownLatch.countDown();
                    });
                });
        executor.shutdown();
        try {
            countDownLatch.await();
        }catch (InterruptedException e){}
        return times;
    }

构造 concurrentLruCache 进行测试

int loop = 10000;
int maxSize = 64;
int sample = 65;
List<Long> metrics1 = Collections.synchronizedList(new ArrayList<>(loop * sample));
LruCache<String,Object> concurrentLruCache = new ConcurrentLruCache<String, Object>(maxSize, v -> v.hashCode(),metrics1);
System.out.println("有问题的 ConcurrentLruCache 版本总耗时 : "+benchmarkLruCache(loop,sample,concurrentLruCache).stream()
        //     .peek(System.out::println)
        .mapToLong(v -> v)
        .sum());

System.out.println("有问题的 ConcurrentLruCache 写次数与耗时 : "+metrics1.stream().mapToLong(v -> v).summaryStatistics());

测试结果

有问题的 ConcurrentLruCache 版本总耗时 : LongSummaryStatistics{count=650000, sum=127519404, min=0, average=196.183698, max=1222}
有问题的 ConcurrentLruCache 写次数与耗时 : LongSummaryStatistics{count=267141, sum=20882, min=0, average=0.078168, max=2}

测试优化后版本的 ConcurrentLruCache


    private static class OptimizeConcurrentLruCache<K, V> implements LruCache<K, V>{

        private final int maxSize;

        private final ConcurrentLinkedDeque<K> queue = new ConcurrentLinkedDeque<>();

        private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();

        private final ReadWriteLock lock;

        private final Function<K, V> generator;

        private volatile int size = 0;

        private final List<Long> metrics;

        public OptimizeConcurrentLruCache(int maxSize, Function<K, V> generator ,List<Long> metrics ) {
            Assert.isTrue(maxSize > 0, "LRU max size should be positive");
            Assert.notNull(generator, "Generator function should not be null");
            this.maxSize = maxSize;
            this.generator = generator;
            this.lock = new ReentrantReadWriteLock();
            this.metrics = metrics;
        }

        @Override
        public V get(K key) {
            V cached = this.cache.get(key);
            if (cached != null) {
                if (this.size < this.maxSize) {
                    return cached;
                }
                this.lock.readLock().lock();
                try {
                    if (this.queue.removeLastOccurrence(key)) {
                        this.queue.offer(key);
                    }
                    return cached;
                }
                finally {
                    this.lock.readLock().unlock();
                }
            }
            this.lock.writeLock().lock();
            long s = System.currentTimeMillis();
            try {
                // Retrying in case of concurrent reads on the same key
                cached = this.cache.get(key);
                if (cached  != null) {
                    if (this.queue.removeLastOccurrence(key)) {
                        this.queue.offer(key);
                    }
                    return cached;
                }
                // Generate value first, to prevent size inconsistency
                V value = this.generator.apply(key);
                int cacheSize = this.size;
                if (cacheSize == this.maxSize) {
                    K leastUsed = this.queue.poll();
                    if (leastUsed != null) {
                        this.cache.remove(leastUsed);
                        cacheSize--;
                    }
                }
                this.queue.offer(key);
                this.cache.put(key, value);
                this.size = cacheSize + 1;
                return value;
            }
            finally {
                metrics.add(System.currentTimeMillis() - s);
                this.lock.writeLock().unlock();
            }
        }
    }

构造 concurrentLruCache 进行测试,测试参数保持一致

List<Long> metrics2 = Collections.synchronizedList(new ArrayList<>(loop * sample));
LruCache<String,Object> optimizeConcurrentLruCache = new OptimizeConcurrentLruCache<String, Object>(maxSize, v -> v.hashCode(),metrics2);
System.out.println("优化后的 ConcurrentLruCache 版本总耗时 : "+benchmarkLruCache(loop,sample,optimizeConcurrentLruCache).stream()
        //     .peek(System.out::println)
        .mapToLong(v -> v)
        .summaryStatistics());

System.out.println("优化后的 ConcurrentLruCache 写次数与耗时 : "+metrics2.stream().mapToLong(v -> v).summaryStatistics());

测试结果

优化后的 ConcurrentLruCache 版本总耗时 : LongSummaryStatistics{count=650000, sum=1881120, min=0, average=2.894031, max=62}
优化后的 ConcurrentLruCache 写次数与耗时 : LongSummaryStatistics{count=56995, sum=44, min=0, average=0.000772, max=19}

原因分析

我们通过 jstack 打印出线程堆栈

"pool-1-thread-100" #109 prio=5 os_prio=31 tid=0x00007fdc0708c800 nid=0xe403 waiting on condition [0x000070000a660000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    // .... 省略  
    at com.izhengyin.demo.concurrent.part6.LruCacheIssueTest$ConcurrentLruCache.get(LruCacheIssueTest.java:133)
    // .... 省略  

"pool-1-thread-97" #106 prio=5 os_prio=31 tid=0x00007fdc07817800 nid=0xe003 waiting on condition [0x000070000a357000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    // .... 省略  
    at com.izhengyin.demo.concurrent.part6.LruCacheIssueTest$ConcurrentLruCache.get(LruCacheIssueTest.java:114)
    // .... 省略  

通过堆栈信息,可以直观的看到,线程都在 WAITING 状态,分别是在 LruCacheIssueTest.java 133行与114行,查看源码,这两行分别对应的写锁的获取与读锁的获取

 类: ConcurrentLruCache
  
 public V get(K key) {
    ... 省略
    this.lock.readLock().lock();  114 行
    ... 省略
    this.lock.writeLock().lock(); 113 行
 }

分析源码,按照我们给定的参数 (maxSize < sample)

  1. get 方法每次执行时都会先获取读锁 , 如果此时写锁为释放那么就会阻塞

  2. 按照我们的样品数,分析读锁代码块里的代码,这段代码有三个问题 (看注释)

    2.1 操作queue未加写锁,因此 queue 会被不断的Remove / add , 同时也导致了很多 this.queue.remove(key) 是不会命中的,这样就讲流程下放到了写操作,这种情况线程数越多竞争的越厉害,情况越会加剧

    2.2 this.queue 的 remove 是从头往后找,而add是网队尾添加,所以大部分情况都会进行N次扫描,效率不高

    2.3 ConcurrentLinkedQueue remove ,存在内存泄漏问题。据描述是8u60版本引入的,我下下载了8u101与8u102的源代码,确定问题在8u102被修复,也就是说java8的部分版本也收到了影响。

    //这段代码不会被满足 ,因为 queue "一直" 是满的
    if (this.queue.size() < this.maxSize / 2) {
        V cached = this.cache.get(key);
        if (cached != null) {
            return cached;
        }
    }
    //这段代码有二个问题 
    // 1. 此次操作queue未加写锁,因此 queue 会被不断的Remove / add , 同时也导致了很多 this.queue.remove(key) 是不会命中的,这样就讲流程下放到了写操作,这种情况线程数越多竞争的越厉害,情况越会加剧
    // 2. this.queue 的 remove 是从头往后找,而add是网队尾添加,所以大部分情况都会进行N次扫描,效率不高
    else if (this.queue.remove(key)) {
        this.queue.add(key);
        return this.cache.get(key);
    }
  1. 写操作上逻辑上没有太大问题,主要是大量的写锁或阻塞读,读锁也会反过来阻塞写锁的获取。所以问题代码主要还是读操作代码段的问题

优化后的版本分析

  1. 采用了 ConcurrentLinkedDeque 替换了 ConcurrentLinkedQueue , 通过 removeLastOccurrence 从后往前删除,减少了删除操作的扫码次数。
  2. 读操作,先不加锁,只要能读到,就返回,极大的减少写流程的次数,这也就减少了获取锁的等待

    V cached = this.cache.get(key);
    if (cached != null) {
        //这个条件不会成立,但影响不大了
        if (this.size < this.maxSize) {
            return cached;
        }
        
        this.lock.readLock().lock();
        try {
            // 采用了 ConcurrentLinkedDeque 替换了 ConcurrentLinkedQueue , 通过 removeLastOccurrence 从后往前删除,减少了删除操作的扫码次数。
            if (this.queue.removeLastOccurrence(key)) {
                this.queue.offer(key);
            }
            // 只要能读到就返回
            return cached;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

我们将两次的结果放在一起做一个直观对比,可以很直观的看到优化前后的差异。

有问题的 ConcurrentLruCache 版本总耗时 : LongSummaryStatistics{count=650000, sum=127519404, min=0, average=196.183698, max=1222}
有问题的 ConcurrentLruCache 写次数与耗时 : LongSummaryStatistics{count=267141, sum=20882, min=0, average=0.078168, max=2}
优化后的 ConcurrentLruCache 版本总耗时 : LongSummaryStatistics{count=650000, sum=1881120, min=0, average=2.894031, max=62}
优化后的 ConcurrentLruCache 写次数与耗时 : LongSummaryStatistics{count=56995, sum=44, min=0, average=0.000772, max=19}
上一篇下一篇

猜你喜欢

热点阅读