java源码解析

AbstractQueuedSynchronizer源码解析

2018-11-13  本文已影响0人  等风中

1.线程

1.1 简介

在介绍AbstractQueuedSynchronizer之前先介绍一下我们的线程。为了提高程序的并行化,从而产生了进程,多进程的好处是将一个程序可以划分为多个进程,之前不相互影响,并且可以比较容易的进行内存共享和通信。并且在多cpu的情况下如果只有一个进程就会浪费cpu的资源。那有了进程为什么还会有线程呢,这是因为进程的创建和销毁会耗费较多的资源,所以此时通过多线程来进行并发编程。一个进程可以包含多个线程,进程内的线程共享同一个堆空间。

1.2 线程状态

线程生命周期

线程状态:


undefined

2.AbstractQueuedSynchronizer分析

AbstractQueuedSynchronizer是java中的同步阻塞器,也是实现ReentrantLock的关键。并且是实现线程工具类的核心方法。在AbstractQueuedSynchronizer中,使用了cas操作来进行线程状态的变更,下面,我们来看一看其中的重要方法。

2.1 Node类介绍

Node类是AbstractQueuedSynchronizer中的内部类,代表wait队列中的一个节点,wait队列是什么呢,其实就是阻塞队列,包含condition阻塞和同步阻塞

2.1.1 重要变量

        //标记表示节点正在共享模式下等待
        static final Node SHARED = new Node();
        //标记标识节点正在独占模式下等待
        static final Node EXCLUSIVE = null;

        //waitStatus为1表示该节点此时为取消节点,即因为超时或者被interrupt打断处于此状态,当线程处于此状态,则不能改变状态,也不能重新被阻塞
        static final int CANCELLED =  1;
        //waitStatus为-1表示当前节点正处于运行状态,此时表示当其前置节点释放锁时他可以被唤醒
        static final int SIGNAL    = -1;
        //waitStatus为-2时表示当前节点正处于CONDITION队列中,直到它从CONDITION队列中移除才能重新进入就绪队列
        static final int CONDITION = -2;
        //waitStatus为-3时表示当前节点状态在共享模式中使用,表示可运行状态
        static final int PROPAGATE = -3;
        
        //指示节点在节点队列中的等待状态,默认为0,使用cas进行修改
        volatile int waitStatus;
        
        //当前节点的前驱节点,主要可以用来检查自身的前驱节点的状态来决定自身的行为
        volatile Node prev;
        //当前节点的后续节点
        volatile Node next;
        //当前节点对应的线程,调用构造函数进行初始化,并在出队时置为null
        volatile Thread thread;
        //在独占模式下表示condition队列中的下一个节点。在共享模式中,用于标识是否为共享模式
        Node nextWaiter;


2.1.2重要方法

//判断是否为共享节点,用nextWaiter标识
final boolean isShared() {
     return nextWaiter == SHARED;
}

//返回对应的前驱节点
final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
}

2.2重要的变量

    //等待队列的头结点
    private transient volatile Node head;

    //等待队列的尾节点
    private transient volatile Node tail;

    //同步资源
    private volatile int state;

2.3重要方法

2.3.1 acquire方法

可以看到acquire方法中先调用了tryAcquire方法,步骤如下:
1.tryAcquire方法尝试获取资源,如果获取成功直接返回
2.如果tryAcquire不成功,那么调用addWaiter将当前线程加入等待队列,并标记为独占模式
3.然后acquireQueued使线程在等待队列中循环获取资源
4.获取资源后,调用selfInterrupt获取中断状态

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

下面,就逐个介绍

2.3.1.1 tryAcquire方法

tryAcquire方法AQS框架不实现,由底层同步器进行封装

protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

2.3.1.2 addWaiter方法

addWaiter方法主要用于将Node节点放入等待队列中进行等待

private Node addWaiter(Node mode) {
        //用当前线程与mode对自身进行初始化,mode分为独占和共享
        Node node = new Node(Thread.currentThread(), mode);
        // 先调用一次快速方法,如果失败,调用enq方法进行入队列循环
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //如果等待队列的tail节点不为空,那么将node设置为新的tail节点
            if (compareAndSetTail(pred, node)) {
                //如果cas设置tail节点成功,那么设置之前尾节点的next节点为node
                pred.next = node;
                return node;
            }
        }
        //如果没有设置成功,调用enq方法设置
        enq(node);
        return node;
}

compareAndSetTail是用cas设置tail节点的方法,不过在addWaiter方法中,只调用一次,可能会失败

private final boolean compareAndSetTail(Node expect, Node update) {
        //CAS调用
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

enq方法,通过循环调用CAS,此方法一定会成功,不过在线程竞争比较激烈的情况下性能可能会有点差。可以看到,此方法中采取了Atomic类中经典的循环调用cas方法,只有成功才会return

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //如果尾节点为空,那么代表等待队列为空,进行CAS设置头结点,并且令尾节点和头结点相等
            if (t == null) { 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //如果不为空,进行循环设置尾节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

2.3.1.3 acquireQueued方法

此方法使线程在等待队列中等待获取资源,直到等到获取到资源才会返回。如果线程在此过程中被中断过,那么会用interrupted进行记录。此方法步骤为
1.判断当前线程所处节点的前驱节点是否为头结点
2.如果是前驱节点为头结点,那么尝试获取资源,如果获取资源成功
3.那么设置头结点为当前线程所处节点,并且将前面头结点的next设置为空(使得前面头结点不关联任何节点,方便gc清理)
4.设置failed = true,并且返回线程是否中断过
5.如果不是头结点或者没有获取到资源,那么会调用shouldParkAfterFailedAcquire方法和parkAndCheckInterrupt方法,如果parkAndCheckInterrupt返回true,代表线程被中断过,对标志位进行记录

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //判断当前线程所处节点的前驱节点是否为头结点
                if (p == head && tryAcquire(arg)) {
                    //设置头结点为当前线程所处节点,并且将前面头结点的next设置为空
                    setHead(node);
                    p.next = null; 
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

下面,我们来介绍shouldParkAfterFailedAcquire方法,此方法用来判断检查线程是否应该被阻塞
1.记录前驱状态
2.当前驱状态为SIGNAL时,安心进行park操作,因为此时前驱如果拿到资源,会对后继节点进行通知
3.当ws>0时,代表前驱状态为cancel,此时,从当前节点的前驱节点遍历,直到遇到不为cancel状态的前驱节点,设置当前节点的前驱节点为此节点(此时一定能找到这样一个节点,因为head节点一定不为cancel)。
4.如果前驱节点小于0,那么此时将ws的前驱节点的waitState设置为Node.SIGNAL

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

再介绍一下parkAndCheckInterrupt方法

private final boolean parkAndCheckInterrupt() {
        //对线程进行阻塞操作,内部实现调用了unsafe的park操作
        LockSupport.park(this);
        //返回线程是否被中断,并重置中断位
        return Thread.interrupted();
    }
     

2.3.2 release方法

release方法主要用于独占模式下对资源的释放,一般资源默认为1,可以看出步骤分为以下几步
1.请求释放资源
2.如果释放资源成功,那么获取当前头节点(需要说明一下,此时,头结点一般为对应的线程)
3.判断头结点是否为空,如果头结点为空或者状态为初始化,直接返回true
4.如果头结点不为空并且其状态不为0,对h的后继节点进行唤醒
5.如果释放资源失败,返回false

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                //如果头结点不为空并且其状态不为0(此时应该为SIGNAL状态),进行唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

下面,介绍一下release中使用到的方法

2.3.2.1 tryRelease方法

与tryAcquire方法一样,tryRelease也是留给自定义同步器去覆盖实现的,如果不覆盖,调用此方法直接抛出异常

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

2.3.2.2 unparkSuccessor方法

此方法主要用来唤醒后继节点,下面看看是怎么唤醒的吧
1.获取到当前节点的状态
2.如果当前节点状态小于0,重新将该节点的状态置0,不保证一定成功,允许失败
3.获取当前节点的next节点
4.如果当前节点的next节点为null或者状态为cancel,那么从尾部开始遍历,直到前驱节点为空。在遍历过程中,如果遇到前驱节点不为cancel的情况,更新当前节点需要唤醒的后继节点
5.如果能够遍历到不为cancel状态的后继节点,那么进行唤醒,否则,不做操作

private void unparkSuccessor(Node node) {
        
        //获取到当前节点的状态,如果当前节点状态小于0,重新将该节点的状态设置为初始化
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        //一般情况下当前节点的next节点即为后继节点,但是后继节点可能为null或者状态被置为Canceled
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从尾部开始遍历,找到距离当前节点最近的状态不为cancel的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //找到可以唤醒的后继节点之后,对该节点进行唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

其中,compareAndSetWaitStatus就是调用unsafe的cas操作对节点状态进行更新

2.3.3 acquireShared方法

介绍完了独占模式下获取资源的方法,下面,我们来介绍共享模式下获取锁的方法。步骤如下:
1.传入需要获取的资源数,尝试获取共享锁
2.如果获取共享锁成功,不做任何操作
3.获取共享锁失败,进入等待队列中进行等待。代码如下:

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
}

下面,介绍一下acquireShared调用的两个方法。

上一篇 下一篇

猜你喜欢

热点阅读