第5章 Java的锁

2018-01-25  本文已影响0人  红袖者
基本概念:

锁:控制多线程并发访问资源;
队列同步器:管理同步状态,实现锁;
同步状态:同步器的操作对象,int类型;
同步队列:同步器通过同步队列管理同步状态;

同步器实现锁:

1.自定义同步器;
2.同步器定义如何获取、释放同步状态;
3.锁通过同步器来实现语义;

public class Mutex implements Lock {

    //自定义的同步器
    static class MySyncer extends AbstractQueuedSynchronizer {

        //独占式获取同步状态
        @Override
        protected boolean tryAcquire(int arg) {
            if(compareAndSetState(0, 1)){
                //设置当前线程独占同步器
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        //独占式释放同步状态
        @Override
        protected boolean tryRelease(int arg) {
            if(getState() == 0)
                throw new IllegalMonitorStateException();
            //设置同步器无占用线程
            setExclusiveOwnerThread(null);
            //设置同步状态为0
            setState(0);
            return true;
        }

        //同步器是否被独占
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        Condition newCondition(){
            return new ConditionObject();
        }
    }

    //同步器对象,用来实现锁
    private final MySyncer syncer = new MySyncer();

    @Override
    public void lock() {
        syncer.acquire(1);
    }

    @Override
    public boolean tryLock() {
        return syncer.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return syncer.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        syncer.acquireInterruptibly(1);
    }

    @Override
    public void unlock() {
        syncer.release(1);
    }

    @Override
    public Condition newCondition() {
        return syncer.newCondition();
    }
    
} 
同步器get/update同步状态:
getState(); //获取同步状态
setState(); //设置同步状态
compareAndSetState(int expect, int update); //CAS设置同步状态,保证原子性
同步器acquire/release同步状态(可重写):
//模板方法调用,非阻塞
tryAcquire(int arg); //独占式占用同步状态
tryRelease(int arg); //独占式释放同步状态
tryAcquireShared(int arg); //共享式占用同步状态
tryReleaseShared(int arg); //共享式释放同步状态

注:单词get与acquire区别,get指直接获取,acquire指通过努力后获取/占用;

同步器的模板方法(不可重写):

锁通过同步器对象直接调用模板方法来实现语义;
模板方法调用上面tryXxxx(int arg)方法来实现状态占用/释放;

//独占式,阻塞式
acquire(int arg); //独占式占用同步状态
acquireInterruptibly(int arg); //独占式,可响应中断
tryAcquireNanos(int arg, long nanosTimesout); //独占式,可响应中断,超时等待
release(int arg); //独占式释放同步状态
//共享式,阻塞式
acquireShared(int arg); //共享式占用同步状态
acquireSharedInterruptibly(int arg); //共享式,可响应中断
tryAcquireSharedNanos(int arg, long nanosTimesout); //共享式, 可响应中断,超时等待
releaseShared(int arg);//共享式释放同步状态

注:阻塞式指同步器未获取同步状态时线程会阻塞,非阻塞指线程不会阻塞;

同步队列:
FIFO同步队列

1.同步队列是由Node组成的链表;
2.当线程获取锁失败时,就把thread及等待状态信息包装成node插入队列尾部,并更改tail节点,通过CAS操作保证并发性;
3.当head节点的线程释放锁时,就唤醒其next节点,next节点获取同步状态成功时就把自己设为head节点;

独占式(共享式)占用和释放同步状态:
独占式/共享式占用同步状态流程图
区别:同一时刻,独占式只有一个线程可以获取同步状态,而共享式可以有多个线程获取同步状态;
相同点:第一次获取同步状态失败后都生成节点,插入同步队列尾部;都是通过自旋来获取同步状态;前驱为头节点时才能获取到同步状态;
自定义同步主键TwinsLock-双线程锁:

同一时刻有两个线程可以持有锁,即同步器共享式占用同步状态,且同步状态数为2;

public class TwinsLock implements Lock {

    //同步器对象
    private Syncer syncer = new Syncer(2);

    static class Syncer extends AbstractQueuedSynchronizer {

        //初始化同步状态的数量
        public Syncer(int stateCount) {
            setState(stateCount);
        }

        //共享式获取同步状态,非阻塞
        @Override
        protected int tryAcquireShared(int arg) {
            for (; ; ) {
                int currentState = getState();
                int newState = currentState - 1;
                if (newState < 0 || compareAndSetState(currentState, newState))
                    return newState;
            }
        }

        //共享式释放同步状态,非阻塞
        @Override
        protected boolean tryReleaseShared(int arg) {
            for (; ; ) {
                int currentState = getState();
                if (compareAndSetState(currentState, currentState + 1))
                    return true;
            }
        }
    }

    //加锁
    @Override
    public void lock() {
        //acquireShared()会调用tryAcquireShared()
        syncer.acquireShared(1);
    }

    //释放锁
    @Override
    public void unlock() {
        syncer.releaseShared(1);
    }
}

lock()调用模板方法acquireShared()共享式获取同步状态, red()调用用户自定义的tryAcquireShared()来获取同步状态,若获取成功,则线程拥有了锁;若tryAcquireShared()获取同步状态失败,则把线程及其状态包装成node插入同步队列的尾部,并进行自旋来获取同步状态;自旋过程中,当node的前置节点是head节点且释放同步状态后,当前节点就调用tryAcquireShared()来获取同步状态,若获取成功则表示线程拥有了锁,若获取失败则继续自旋;

重入锁:

重入锁指线程获取到锁后能够再次获取该锁,而不会被锁阻塞;
公平模式:
锁的获取顺序符合请求的绝对时间顺序;

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //同步状态
        int c = getState();
        if (c == 0) {
            //锁未被任何线程占用
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                //没有更早的线程处于WAITTING状态,且当前线程获取同步状态成功
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            //锁被当前线程占用
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

非公平模式:
有新线程请求锁时,先争夺一下锁,没成功再去排队;排队之后依然满足FIFO规则,即前面节点的线程先获取锁;

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //同步状态
        int c = getState();
        if (c == 0) {
            //锁未被任何线程占用
            if (compareAndSetState(0, acquires)) {
                //当前线程获取同步状态成功
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            //锁被当前线程占用
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

总结:公平和非公平模式下调用lock()获取锁时,调用acquire()模板方法,acquire()方法首先都调用tryAcquire(1)第一次尝试获取同步状态,若锁此刻锁未被占用,则公平模式下会判断是否有更早的线程处于WAITTING状态,若有则第一次尝试获取锁失败,若没有则尝试获取同步状态;而非公平模式下则直接尝试获取同步状态,不考虑是否有更早的线程是否处于WAITTING状态;公平模式和非公平模式在第一次尝试获取同步状态失败后,都会把线程及其状态包装成node插入FIFO同步队列尾部,之后通过自旋来获取同步状态,公平和非公平模式下都需要等待前置节点来唤醒自己;
性能对比:公平锁具有大量的线程切换,因此其吞吐性不如非公平锁;

读写锁:

排它锁:同一时刻只能有一个线程获取锁(如ReentrantLock);
1.读锁是非排它锁,同一时刻可以有多个读线程获取锁;但是写线程获取锁时,所有读线程和其它写线程均被阻塞;
2.读写锁是一对锁,包括读锁(可重入共享锁)和写锁(可重入排它锁);
3.包括公平模式和非公平模式;
4.支持重进入,且在获取写锁后还能获取读锁,但获取读锁后不能获取写锁;
5.在读多于写的情况下,读写锁比排它锁具有更好的吞吐性;
读写状态的设计:
整型变量的高16位表示读、低16位表示写,则线程获取读锁后,同步状态S=S+(1<<16),线程获取写锁后,同步状态S=S+1;
写锁的获取与释放:
写锁获取成功的条件:1.写锁此刻被当前线程拥有;2.读锁or写锁此刻没被任何线程拥有;否则获取写锁失败,线程进入WAITTING状态;

protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            //存在读锁or写锁
            // (Note: if c != 0 and w == 0 then shared count != 0)
            if (w == 0 || current != getExclusiveOwnerThread())
                //存在读锁or不是当前线程获取写锁,则获取写锁失败
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            //之前是当前线程获取的写锁,因此不存在并发问题,不需要CAS操作
            setState(c + acquires);
            return true;
        }
        //之前没任何线程获取读锁or写锁,此刻可能有多个写线程并发请求写锁,需要CAS操作设置同步状态
        if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
            return false;
        //当前线程首次获取同步状态成功
        setExclusiveOwnerThread(current);
        return true;
    }

读锁的获取与释放:
读锁获取成功的条件:1.写锁此刻被当前线程拥有;2.写锁此刻没被任何线程拥有;否则获取写锁失败,线程进入WAITTING状态;
只要"其它线程拥有写锁"的情况不出现,则当前线程就不断尝试获取同步状态,而不是进入等待状态,是因为当前线程此刻仍然具有获取读锁的资格,而不用等待资格;但是当其它线程拥有写锁时,则当前线程获取同步状态失败,失去获取读锁的资格,退出for循环,进入WAITTING状态,并把线程及其状态包装成node插入FIFO同步队列尾部,之后通过自旋来获取同步状态

    protected final int tryAcquireShared(int unused) {
        //只要"其它线程拥有写锁"的情况不出现,则当前线程就不断尝试获取同步状态,
        for (; ; ) {
            Thread current = Thread.currentThread();
            int c = getState();
            int nextC = c + (1 << 16);
            if (nextC < c)
                throw new Error("Maximum lock count exceeded");
            if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
                //存在写锁(写状态不为0),且写锁拥有线程不是当前线程,则获取读锁失败
                return -1;
            if (compareAndSetState(c, nextC))
                //获取读锁成功
                return 1;
        }
    }

锁降级:
锁降级的过程:拥有写锁->预处理(写/改)数据->拥有读锁->释放写锁->使用(读)数据->释放读锁,即读锁降级到写锁;
占用读锁,释放写锁,读数据,这样做有两个好处:1.使用(读)数据期间,其它数据只读的线程可以获取到读锁,而不至于堵塞,提高了效率;2.保证使用(读)数据的过程中数据是没有发生变化的,因为在释放读锁前,其它线程无法获取写锁来更改数据;

public class LockDown {

    static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    static Lock readLock = lock.readLock();
    static Lock writeLock = lock.writeLock();

    static class Thread1 implements Runnable{
        //线程是否完成了数据准备的更新
        static volatile boolean update = false;

        @Override
        public void run() {

        }

        public void processData(){
            readLock.lock();
            if(update == false){
                readLock.unlock();
                writeLock.lock();
                try {
                    //预处理(写/改)数据
                    prepareData();
                    update = true;
                    //先获取读锁
                    readLock.lock();
                }finally {
                    //然后释放写锁
                    writeLock.unlock();
                }
            }
            try {
                //使用(读)数据
                useData();
            }finally {
                //最后释放读锁
                readLock.unlock();
            }
        }
        
        public void prepareData(){}

        public void useData(){}
    }

}

注:Java不支持锁升级,锁升级会引起死锁;

LockSupport工具:

LockSupport工具类定义了一组public static方法,用来阻塞当前线程、唤醒被阻塞的线程;

void park(); //阻塞当前线程
void unpark(Thread thread); //唤醒阻塞的线程thread
void park(Object blocker); //blocker是当前线程等待的对象(阻塞对象,锁对象??)
Condition接口:

提供锁的监视器方法:await()、signal()、signalAll(),用于线程间通信;

//Condition对象通过Lock对象创建
Condition condition = lock.newCondition();

等待队列:
当线程调用condition.await()方法时,就把线程及其状态包装成node插入等待队列的尾部,此刻线程进入WAITTING状态;
等待队列与同步队列不同之处:
1.同步队列是线程获取获取锁(同步状态失败)时插入的队列,等待队列是线程调用wait()方法时插入的队列;2.同步队列是双链表,而等待队列是单链表;3.一个Condition对象对应一个等待队列,一个Lock对象可以有多个Condition对象,即可以拥有多个等待队列,但只能拥有一个同步队列;
等待队列与与同步队列相同之处:
1.共用同步器的静态内部类AbstractQueuedSynchronized.Node来生成节点;2.链表结构相似;
等待队列的结构:
单向FIFO链表,head节点指向链表的头,tail节点指向链表的尾;

等待队列的结构
线程进入等待状态:
线程进入等待状态
1.同步队列head节点的线程构造新节点并加入等待队列;2.释放同步状态;3.唤醒后继节点;4.进入等待状态;
线程被通知:
线程被通知
1.等待队列的head节点移动到同步队列的尾节点上,线程从WAITTING状态进入BLOCKED状态;2.唤醒线程去竞争同步状态;3.非同步队列的首节点,获取同步状态失败,再次阻塞;4.最终通过自旋获取同步状态成功后从await()返回,此刻线程成功获取了锁;
上一篇下一篇

猜你喜欢

热点阅读