编程语言-Java系列

看!源码之ReentrantLock公平锁 上(内涵linux

2020-03-14  本文已影响0人  starskye

ReentrantLock的定义

ReentrantLock 可重入锁,当前线程再次获取当前锁是则可直接获取防止死锁。

Sync的定义

ReentrantLock 内部类Sync用于管理同步,ReentrantLock 中方法基本都是调用内部对象Sync实现。

FairSync的定义

Sync的公平锁实现,采用队列方式顺序获取锁从而达到公平获取。


Sync的组成

AbstractOwnableSynchronizer 用于记录当前拥有锁的线程信息。

//定义关系很简单仅仅实现了Serializable 序列接口
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    //序列ID
    private static final long serialVersionUID = 3737899427754241961L;
    //设置构造为protected
    protected AbstractOwnableSynchronizer() { }
    //核心属性存储当前获取所得线程的引用
    private transient Thread exclusiveOwnerThread;
    //设置当前获取锁的线程
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    //获取当前获取所得线程
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

AbstractQueuedSynchronizer 锁队列操作实现。

//当前类继承与上方的AbstractOwnableSynchronizer
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    private static final long serialVersionUID = 7373984972572414691L;

    protected AbstractQueuedSynchronizer() { }

   //获取锁的链表中Node结构
   //此链表为双向链表
    static final class Node {
        //注意修饰符static 与Node类直接关联而不是Node对象,代表所有Node对象都
        //使用的一个SHARED 而此常量的含义是锁的类型为共享锁
        //如果为共享锁则会使用此类引用
        static final Node SHARED = new Node();
        //与上方共享锁恰恰相反此处为独占锁 node的mode为这两个值得之一
        static final Node EXCLUSIVE = null;
        //当前Node的等待状态的状态类型 取消类型 如果状态为此则在遍历链表时则跳过
        static final int CANCELLED =  1;
        //当前Node的等待状态的状态类型 预示着下一个线程需要被释放运行
        static final int SIGNAL    = -1;
        //当前Node的等待状态的状态类型 代表当前node在等待条件触发
        static final int CONDITION = -2;
        //当前Node的等待状态的状态类型 共享锁使用无条件传播当前锁的占有权
        static final int PROPAGATE = -3;

         //当前node的等待状态
        volatile int waitStatus;
        //双向链表的指向前Node属性
        volatile Node prev;
        //双向链表的指向下一个Node的属性
        volatile Node next;
        //当前的线程
        volatile Thread thread;

         //锁的类型是独占锁还是共享锁
        Node nextWaiter;

       //判断当前锁是否为共享锁
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        //获取当前node节点的前一个Node 获取为null则抛出异常
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {   
        }
        //用于将Node添加到等待队列中的构造
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        //用于设置条件触发使用的
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    //当前队列的头节点
    private transient volatile Node head;

     //当前节点的尾节点
    private transient volatile Node tail;

    //当前锁的状态 等于0 说明当前锁并未使用 大于0则是使用 并且超过1说明当前再被重入
    private volatile int state;

上分是使用的属性及类,为了方便理解下方将会以调用顺序解析。接下来则是公平锁FairSync。

    //公平锁实现 继承与Sync 很简单实现了两个方法
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        //可以看到FairSync 的lock实现调用了acquire此方法的实现在Sync 中
        final void lock() {
            acquire(1);
        }
        //acquire 内部调用了tryAcquire ,acquire 具体实现下方介绍
        //acquires 一般为 1 代表当前state的标识状态
        protected final boolean tryAcquire(int acquires) {
            //获取当前线程
            final Thread current = Thread.currentThread();
            //获取当前锁的状态
            int c = getState();
            //如果等于0 说明锁并未使用尝试获取锁
            if (c == 0) {
                //获取之前因为是公平锁需要队列顺序所以先查看是否有等待锁的Node。
                //如果存在则不会继续执行 否则 compareAndSetState 尝试设置值
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    //设置成功则设置当前拥有锁的线程为当前线程并且返回true
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果当前线程就是持有锁的线程 说明已经拥有锁此处重入
            else if (current == getExclusiveOwnerThread()) {
                //计算当前以重入次数
                int nextc = c + acquires;
                //如果传入acquires 导致计算为0 则抛出异常
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                //设置当前计算的值到state
                setState(nextc);
                return true;
            }
            return false;
        }
    }
  //下面介绍上方使用到的方法一调用层级方式
  //属于AbstractQueuedSynchronizer
  //传入获取arg 通常为 1
  //lock直接调用方法
  public final void acquire(int arg) {
        //先尝试获取上方的锁如果获取成功则停止
        if (!tryAcquire(arg) &&
            //如果获取锁失败则执行此处首先(addWaiter)加入队列Node的mode为独占锁
            //队列加入成功返回当前的node然后进入(acquireQueued)进行队列自旋获取锁
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //如果发现acquireQueued返回true说明有中断消息 则中断当前线程
            selfInterrupt();
  }


  //属于AbstractQueuedSynchronizer
  //将当前线程加入到等待队列中
  //mode 为EXCLUSIVE/SHARED 独占锁/共享锁
  private Node addWaiter(Node mode) {
        //使用当前线程创建一个Node
        Node node = new Node(Thread.currentThread(), mode);
        //获取当前tail最后一个节点做为上一个节点数据
        Node pred = tail;
         //不等于null说明当前队列是有数据 如果等于null 说明没有数据则需要enq方法进行入队
        if (pred != null) {
            //设置当前节点前一个节点为刚才获取到的上一个节点数据
            node.prev = pred;
            //尝试设置当前节点为尾节点
            if (compareAndSetTail(pred, node)) {
                //如果设置成功则将刚才获取到的尾节点的next设置为当前节点形成链表
                pred.next = node;
                return node;
            }
        }
        //走到此处说明上方入队失败 代表还有别的线程捷足先登了
        //则进行自旋入队  之所以上方先首先入队的原因是为了减少没必要的操作
        //先尝试入队成功则直接返回当前的node否则走到此处继续执行入队操作
        enq(node);
        //入队成功则返回当前节点
        return node;
    }
    
   //属于AbstractQueuedSynchronizer
   //上一个方法的入队操作
   private Node enq(final Node node) {
        //为了避免入队失败则使用自旋直至添加成功
        for (;;) {
            //获取链表尾节点
            Node t = tail;
            //如果尾节点为null说明当前链表是空的需要进行初始化
            if (t == null) { 
                //compareAndSetHead的实现unsafe.compareAndSwapObject(this, headOffset, null, update);
                //可以看到也是CAS如果当前head为null则设置当前new的空node进入做为头结点
                if (compareAndSetHead(new Node()))
                   //设置成功则设置尾节点也为当前头结点 设置完成则进行下一次循环
                   tail = head;
            } else {
                //设置当前节点的前一个节点为获取的tail节点
                node.prev = t;
                //设置当前尾节点为当前节点cas 实现与上方相同只不过null为t
                if (compareAndSetTail(t, node)) {
                    //设置成功则设置t的next为当前node
                    t.next = node;
                    return t;
                }
            }
        }
    }
    //到此处已经成功入队则进行下一步进入自旋以便获取锁
    //属于AbstractQueuedSynchronizer
    //进入队列后到此处自旋获取锁
    final boolean acquireQueued(final Node node, int arg) {
        //默认设置失败为true
        boolean failed = true;
        try {
            //中断状态为false
            boolean interrupted = false;
            //进入自旋
            for (;;) {
                //获取当前节点的的前一个节点
                final Node p = node.predecessor();
                //如果前一个节点是头结点并且当前节点获取到了锁
                if (p == head && tryAcquire(arg)) {
                    //则设置当前节点为头结点
                    setHead(node);
                    //清除上一个节点对当前节点的引用方便gc回收
                    p.next = null;
                     //代表获取锁成功
                    failed = false;
                    //返回中断结果
                    return interrupted;
                }
                //如果p节点不是头结点 或者获取锁失败
                //则尝试从队列中丢弃已取消的Node节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    //将当前线程设置为等待并且检查线程是否被中断
                    parkAndCheckInterrupt())
                     //如果是则设置为true并且返回
                    interrupted = true;
            }
        } finally {
            //如果获取锁失败则说明出现了异常则取消当前node在等待队列中
            //并且清除node之前的取消节点并且唤醒下个节点
            if (failed)
                cancelAcquire(node);
        }
    }

    //属于AbstractQueuedSynchronizer
    //设置当前头结点对象
    //因为是队列顺序执行所以每次执行完一个Node节点都需要删除
    //而每个Node都上一个节点即使刚执行完上一个节点也是头结点所以需要删除这个空节点
    //删除方式则是设置当前节点为头结点
    private void setHead(Node node) {
        //设置head为当前node并且清除引用方便gc 这需要注意调用此方法都是以获取到锁所以此处并没有使用cas而是直接设置
        head = node;
        node.thread = null;
        node.prev = null;
    }
    //属于AbstractQueuedSynchronizer
    //在当前线程进入等待状态之前先清理下已取消获取锁的Node
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取上一个节点的状态
        int ws = pred.waitStatus;
        //如果上一个节点状态时signal说明他已经被激活获取锁 则返回true
        //因为上个节点结束就是当前节点所以没需要清理的node
        if (ws == Node.SIGNAL)
            return true;
        //如果大于0则说明上个节点需要清理出队
        if (ws > 0) {
          //跳过所有waitStatus 为取消状态的node直到获取到一个小于等于0的node
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //并且设置改节点的next为当前节点
            //此处设置节点之所以没有使用cas因为当前节点已经是最后一个节点
            //如果因为并发在进入一个线程也会因为当前节点的waitStatus 为0 而进入不到当前if中
            pred.next = node;
        } else {
            //否则设置前一个节点的WaitStatus为signal可以被激活状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        //返回失败 直到下一次循环前一个节点为SIGNAL才返回true 因为这样会减去线程等待的操作,可能再次循环将会获取锁具体逻辑查看acquireQueued
        return false;
    }
    //属于AbstractQueuedSynchronizer
    //将当前线程睡眠当唤醒时查看当前线程是否为中断状态如果是则返回true否则为false
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
   // 属于LockSupport
   //设置当前线程为等待状态
   public static void park(Object blocker) {
        //获取当前线程
        Thread t = Thread.currentThread();
        //设置当前线程的被阻塞对象,代表当前线程是被谁阻塞的上方传入this
        //一般用于堆栈查看被阻塞信息 确定问题使用
        setBlocker(t, blocker);
        //  调用的unsafe的park第一个参数代表是否为绝对等待,如果为是代表将会一直等待直到获取锁
       //如果为否则后面的参数起作用为获取锁的等待时间
        UNSAFE.park(false, 0L);
        //到此处说明已经被唤醒则设置被阻塞对象为null
        setBlocker(t, null);
    }
    //这里进行下深入在linux中UNSAFE.park做了什么,此处只讲关键代码其他的jvm操作感兴趣的读者自己阅读
    //首先使用当前线程调用了parker的park方法 文件unsafe.cpp中1249行
    thread->parker()->park(isAbsolute != 0, time);
    //而parker()方法返回的是一个class PlatformParker 指针 文件os_linux.cpp 393行
    //而在此类中的关键声明
    //声明的一个枚举 枚举中存放的是下方_cond 的index索引 0 绝对时间等待获取锁 1 相对时间获取锁
     enum {
       REL_INDEX = 0,
       ABS_INDEX = 1
     };
     int _cur_index; //当前线程使用的是上方枚举的那个cond 0 或 1 或者 -1 未使用
     pthread_mutex_t _mutex [1] ;//linux中线程互斥锁
     pthread_cond_t  _cond  [2] ; //linux中线程操作用于等待或唤醒 长度为2与enum呼应
    //再就是park方法的调用 文件os_linux.cpp 6250行
    //首先 查看当前线程释放被中断如果是则返回 否则获取锁获取失败也返回
   if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
      return;
    }
    //获取到锁后判断时间释放为0
    if (time == 0) {
      //如果为0则是在当前cur_index为绝对时间等待只不过当前不需要等待从而死等
    _cur_index = REL_INDEX;
    //设置完成后使用pthread设置等待&_cond[_cur_index]与上方相应 没有等待
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
   } else {
    //不等于0 则判断当前是否为相对等待时间则使用相对等待的index否则使用绝对时间
    //再此之前有个时间计算如果是绝对时间则将时间转换为c系统时间到absTime中
    //如果是相对时间则在当前时间上加上传入的时间
    //这里需要注入如果时间数大于100000000 则使用100000000为当前asbTime
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    //设置等待就完了
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
  //被唤醒后 销毁当前使用的cond并且再次初始化一个方便下次使用
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }
  //重置curindex为-1
  _cur_index = -1;

到此处lock的所有加锁的调用流程已经结束。内容过多分两节讲下篇文章讲解释放锁。

上一篇下一篇

猜你喜欢

热点阅读