框架程序员

读写锁ReentrantReadWriteLock之锁降级

2017-07-06  本文已影响1544人  light_shine

读写锁ReentrantReadWriteLock简介

ReentrantReadWriteLock是ReadWriteLock接口的实现,ReentrantReadWriteLock中有两个静态内部类:ReadLock读锁和WriteLock写锁,这两个锁实现了Lock接口,ReentrantReadWriteLock支持可重入,同步功能依赖自定义同步器(AbstractQueuedSynchronizer)实现,读写状态就是其同步器的同步状态。同步状态由一个整型变量表示,因为这个变量需要表示多个线程的读和写的状态,因此读写锁在实现上将该变量的高16位表示读,低16位表示写。

写锁的获取和释放

写锁WriteLock是支持重进入的排他锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取读锁时,读锁已经被获取或者该线程不是已获取写锁的线程,则当前线程进入等待状态。读写锁确保写锁的操作对读锁可见。写锁释放每次减少写状态,当前写状态为0时表示写锁已背释放。

读锁的获取与释放

读锁ReadLock是支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(写状态为0)时,读锁总是能够被成功地获取,而所做的也只是增加读状态(线程安全)。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已经被获取,则进入等待状态。

锁降级

锁降级指的是写锁降级成为读锁。锁降级是指把持住当前拥有的写锁的同时,再获取到读锁,随后释放写锁的过程。以下是oracle官网的对于锁降级的示例代码:

 class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
        // Must release read lock before acquiring write lock
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        try {
          // Recheck state because another thread might have
          // acquired write lock and changed state before we did.
          if (!cacheValid) {
            data = ...
            cacheValid = true;
          }
          // Downgrade by acquiring read lock before releasing write lock
          rwl.readLock().lock();
        } finally {
          rwl.writeLock().unlock(); // Unlock write, still hold read
        }
     }

     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }

代码中声明了一个volatile类型的cacheValid变量,保证其可见性。首先获取读锁,如果cache不可用,则释放读锁,获取写锁,在更改数据之前,再检查一次cacheValid的值,然后修改数据,将cacheValid置为true,然后在释放写锁前获取读锁;此时,cache中数据可用,处理cache中数据,最后释放读锁。这个过程就是一个完整的锁降级的过程,目的是保证数据可见性,如果当前的线程C在修改完cache中的数据后,没有获取读锁而是直接释放了写锁,那么假设此时另一个线程T获取了写锁并修改了数据,那么C线程无法感知到数据已被修改,则数据出现错误。如果遵循锁降级的步骤,线程C在释放写锁之前获取读锁,那么线程T在获取写锁时将被阻塞,直到线程C完成数据处理过程,释放读锁。

示例

以上的处理过程稍显抽象,因此,我写了一个具体的案例来演示以上过程,代码如下:

public class ReadWriteLockTest {

    private volatile boolean cacheValid = false;

    private int currentValue = 0;

    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private Lock readLock = lock.readLock();

    private Lock writeLock = lock.writeLock();

    /**
     * 测试用例
     * @throws InterruptedException
     */
    @Test
    public void testLockDowngrading() throws InterruptedException {
        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(2);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 100, TimeUnit.SECONDS, new ArrayBlockingQueue
                <Runnable>(10));
        for (int i = 0; i < 2; i ++){
            int finalI = i;
            executor.execute(new Thread(new Runnable() {
                @Override
                public void run() {
                    Thread.currentThread().setName("thread-" + finalI);
                    try {
                        start.await();
                        TimeUnit.SECONDS.sleep(finalI);
                        System.out.println("after sleep " + finalI + " seconds, excute " + Thread.currentThread().getName());
                        cacheValid = false;
                        processCachedData(finalI);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        end.countDown();
                    }

                }
            }));
        }
        start.countDown();
        end.await();

    }

    /**
     * 锁降级过程
     * @param num
     */
    private void processCachedDataDownGrading(int num){
        readLock.lock();
        if(!cacheValid){
            //必须先释放写锁
            readLock.unlock();
            writeLock.lock();
            try{
                //在更新数据之前做二次检查
                if(!cacheValid){
                    System.out.println(Thread.currentThread().getName() + " has updated!");
                    //将数据更新为和线程值相同,以便验证数据
                    currentValue = num;
                    cacheValid = true;
                    readLock.lock();
                }
            }finally {
                writeLock.unlock();
            }

        }

        try{
            //模拟5秒的处理时间,并打印出当前值,在这个过程中cacheValid可能被其他线程修改,锁降级保证其他线程写锁被阻塞,数据不被改变
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + ": " +  currentValue);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(lock.getReadHoldCount() > 0){
                readLock.unlock();
            }
        }
    }

    /**
     * 无锁降级的过程
     * @param num
     */
    private void processCachedData(int num){
        readLock.lock();
        if(!cacheValid){
            readLock.unlock();
            writeLock.lock();
            try{
                if(!cacheValid){
                    System.out.println(Thread.currentThread().getName() + " has updated!");
                    currentValue = num;
                    cacheValid = true;
                }
            }finally {
                writeLock.unlock();
            }
        }
        try{
            //模拟5秒的处理时间,并打印出当前值,在这个过程中cacheValid可能被其他线程修改,无锁降级过程,其他线程此时可能获取写锁,并更改书数据,导致后面的数据错误
            TimeUnit.SECONDS.sleep(5);
            System.out.println(Thread.currentThread().getName() + ": " +  currentValue);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if(lock.getReadHoldCount() > 0){
                readLock.unlock();
            }
        }
    }

}

在上述的测试代码中,有两个方法,一个是processCachedDataDownGrading,该方法模拟锁降级的过程,另一个是processCachedData,模拟无锁降级的过程。

在主测试代码中,起了两个线程,每个线程执行前先将cacheValid置为false,同时,为了模拟处理数据5秒中外部线程的想要执行的数据变更,两个线程的实际执行行为相差1秒。

如果两个线程中执行的方法是processCachedData(无锁降级过程),那么输出为:

after sleep 0 seconds, excute thread-0
thread-0 has updated!
after sleep 1 seconds, excute thread-1
thread-1 has updated!
thread-0: 1
thread-1: 1

从输出看,线程0马上执行(不是指run前,指run中我们想测试的内容),线程1过了1秒执行,两个线程中的数据均被改变,但是最终两个线程中的值均为1(实际我们期望线程0中的数据应该为0),导致这个结果的原因就是在线程0处理数据(sleep5秒)的过程中,线程1获取了写锁并更新了数据,从而线程0的数据被更新。

如果两个线程中执行的方法是processCachedDataDownGrading(锁降级过程),那么输出为:

after sleep 0 seconds, excute thread-0
thread-0 has updated!
after sleep 1 seconds, excute thread-1
thread-0: 0
thread-1 has updated!
thread-1: 1

从输出看,线程0马上执行,线程1过了1秒执行,两个线程中的数据均被改变,但是最终两个线程中的值分别为0和1,符合期望,因为这个过程是锁降级的过程,线程0在更新数据之后,释放写锁之前,获取了读锁,并在处理完数据之后才将其释放,因此线程1在线程0在处理数据时获取写锁被阻塞,从thread-1 has updated! 语句输出在thread-0: 0 之后也可看出,从而保证了数据0的数据没有问题。

以上是我个人的一些理解,如有问题,欢迎邮件至wgy@live.cn探讨,多谢!

上一篇 下一篇

猜你喜欢

热点阅读