ReentrantLock

2018-10-16  本文已影响0人  RealityVibe

部分源码分析

内部类部分

Sync类

 abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is
         * implemented in subclasses, but both need nonfair
         * try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            // 更新state值
            int c = getState() - releases;
            // 如果该线程不是目前独占锁的拥有者,抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 如果全部锁释放,返回true
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        // 判断是否持有(独占)锁
        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class
 
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        // 获取加锁次数
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
        
        // 判断是否加锁
        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes this lock instance from a stream.
         * @param s the stream
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

NonfairSync

 static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         * 加锁操作,先尝试插个队请求锁。插队失败,则正常排队获取。
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

FairSync

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        // 排队获取锁
        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         * 正常是递归调用,或没有等待者,或者来到了队列的head,否则不授予权限
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                // 重复加锁(对应的,要释放对应数量的锁),对应可重入锁
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

ReentrantLock类本类

构造函数
    /**
     * 无参构造函数
     * 默认sync为不公平
     * 同 ReentrantLock(false);
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * 参数fair决定fair/nonfair
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

lock()

获取锁的方式看sync为fair还是nonfair

NonfairSync中调用lock()时,线程会插队先去竞争锁,竞争失败后才会进入阻塞队列。而FairSync中,线程调用lock()时,直接进入阻塞队列等待。

/**
 * 获取锁,获取锁的方式看sync为fair还是nonfair
 * <p> 如果锁未被占有或当前线程已经占有锁,则count+1
 * <p> 如果锁已经被占有,且占有者不是当前线程则当前线程进入休眠状态,直到占有锁。
 */
public void lock() {
    sync.lock();
}
lockInterInterruptibly()
/**
 * 以独占模式获取,打断后终止。
 * 首先检查中断状态,然后至少调用一次{@link #tryAcquire},成功则返回success。 否则进入queue,通过
 * CAS自旋获取锁(见AbstractQueuedSynchronizer#acquireQueued),直到成功或者被打断
 */
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

场景应用

如何而配合Condition

// TODO

中断响应


import java.util.concurrent.locks.ReentrantLock;

public class InterruptThread implements Runnable {
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
    int lock;

    public InterruptThread(int lock) {
        this.lock = lock;
    }

    @Override
    public void run() {
        try {
            if (lock == 1) {
                lock1.lockInterruptibly();
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                }
                lock2.lockInterruptibly();
            } else if (lock == 2) {
                lock2.lockInterruptibly();
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                }
                lock1.lockInterruptibly();
            }
                if (lock1.isHeldByCurrentThread())
                    lock1.unlock();
                if (lock2.isHeldByCurrentThread())
                    lock2.unlock();
                System.out.println(Thread.currentThread().getId() + "退出");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        InterruptThread r1 = new InterruptThread(1);
        InterruptThread r2 = new InterruptThread(2);
        Thread t1 = new Thread(r1);
        Thread t2 = new Thread(r2);
        t1.start();
        t2.start();
        t1.interrupt();
    }

}

实现阻塞队列

package com.yrls.config;

import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedQueue {
    private Integer[] items;//定义为数组,在创建对象时就确定容量
    private Lock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    private int count;
    private int addIndex,removeIndex;

    public BoundedQueue(int size){
        items = new Integer[size];
    }

    public void add(Integer object) throws InterruptedException{
        lock.lock();
        System.out.println(Thread.currentThread().getName() + "lock ");
        try{
            if(Thread.currentThread().getName().equals("1") || Thread.currentThread().getName().equals("10") ){
                for(int i = 0;i<10;++i){
                    System.out.println(i);
                }
            }

            while(count==items.length){
                notFull.await();
            }
            items[addIndex] = object;
            if(++addIndex==items.length){
                addIndex = 0;
            }
            count++;
            System.out.println(Thread.currentThread()+" 插入一个元素,数组为:"+Arrays.toString(items));
            notEmpty.signal();
        }finally{
            System.out.println(Thread.currentThread().getName() + " unlock");
            lock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    public Integer remove() throws InterruptedException{
        lock.lock();
        System.out.println(Thread.currentThread().getName() + "  remove-lock");
        try{
            if(Thread.currentThread().getName().equals("1")){
                for(int i = 0;i<10;++i){
                    System.out.println(i);
                }
            }
            while(count==0){
                System.out.println("----start wait----");
                notEmpty.await();
            }
            Integer temp = items[removeIndex];
            items[removeIndex] = null;
            System.out.println(Thread.currentThread()+" 移除一个元素,数组为:"+Arrays.toString(items));
            if(++removeIndex==items.length){
                removeIndex=0;
            }
            count--;
            notFull.signal();
            return temp;
        }finally{
            System.out.println(Thread.currentThread().getName() + "  remove-unlock");
            lock.unlock();
        }
    }
}
public class New1 {
    private static final Random random = new Random(System.currentTimeMillis());

    public static void main(String[] args) throws InterruptedException {

        BoundedQueue queue = new BoundedQueue(5);

        for(int i=1;i<=6;i++){
            Thread thread = new Thread(new Producter(queue),String.valueOf(i));
            thread.start();
        }

        for(int i=1;i<=12;i++){
            Thread thread = new Thread(new Consumer(queue),String.valueOf(i));
            thread.start();
        }
    }

    static class Producter implements Runnable{
        private BoundedQueue queue;
        public Producter(BoundedQueue queue){
            this.queue = queue;
        }
        public void produce() throws InterruptedException{
            queue.add(new Integer(random.nextInt(100)));
        }
        @Override
        public void run() {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class Consumer implements Runnable{
        private BoundedQueue queue;
        public Consumer(BoundedQueue queue){
            this.queue = queue;
        }
        public Integer remove() throws InterruptedException{
            return queue.remove();
        }
        @Override
        public void run() {
            try {
                remove();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

为了便于了解对ReentrantLock加锁过程中其它线程的状态,打印内容增加了一些无聊的输出,便于理解。

  1. 可以看到第一个线程占有reentrantlock时,其它线程无法对去竞争reentrantlock。(在thread1中打印数字的这段时间,未被打断)
  2. 可重入锁是针对同线程而言,不同线程不适用。
  3. 可以看到输出的最后部分有线程6\7\9\11\12(不一定是这几个)的加锁但没有解锁,线程6获取锁后,由于资源耗尽,触发condition.await(),6将占有锁并等待signal()。在6释放锁之前,接下来的线程的lock()都将竞争失败,进入队列。
1lock 
0
1
2
3
4
5
6
7
8
9
Thread[1,5,main] 插入一个元素,数组为:[49, null, null, null, null]
1 unlock
2lock 
Thread[2,5,main] 插入一个元素,数组为:[49, 22, null, null, null]
2 unlock
3lock 
Thread[3,5,main] 插入一个元素,数组为:[49, 22, 60, null, null]
3 unlock
4lock 
Thread[4,5,main] 插入一个元素,数组为:[49, 22, 60, 70, null]
4 unlock
3  remove-lock
Thread[3,5,main] 移除一个元素,数组为:[null, 22, 60, 70, null]
3  remove-unlock
5lock 
Thread[5,5,main] 插入一个元素,数组为:[null, 22, 60, 70, 66]
5 unlock
8  remove-lock
Thread[8,5,main] 移除一个元素,数组为:[null, null, 60, 70, 66]
8  remove-unlock
6lock 
Thread[6,5,main] 插入一个元素,数组为:[60, null, 60, 70, 66]
6 unlock
1  remove-lock
Thread[1,5,main] 移除一个元素,数组为:[60, null, null, 70, 66]
1  remove-unlock
2  remove-lock
Thread[2,5,main] 移除一个元素,数组为:[60, null, null, null, 66]
2  remove-unlock
4  remove-lock
Thread[4,5,main] 移除一个元素,数组为:[60, null, null, null, null]
4  remove-unlock
5  remove-lock
Thread[5,5,main] 移除一个元素,数组为:[null, null, null, null, null]
5  remove-unlock
6  remove-lock
----start wait----
7  remove-lock
9  remove-lock
10  remove-lock
11  remove-lock
12  remove-lock
上一篇 下一篇

猜你喜欢

热点阅读