Java多线程(9)

2020-01-22  本文已影响0人  Cool_Pomelo

Java多线程(9)

AQS(2)

锁的占有与释放

对于AQS来说,线程同步的关键是对状态值state进行操作,根据state是否属于一个线程,操作state的方式分为独占方式和共享方式

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }


     public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

     public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    } 

使用独占方式获取的资源是与具体线程绑定的,就是一个线程获取了资源,就会标记是这个线程获取到了,其他线程再尝试操作state获取资源时就会发现当前该资源不是自己持有的,就会在获取失败后阻塞

共享方式的资源与具体线程是不相关的,当多个线程请求资源时通过CAS方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS方式进行获取即可

独占方式获取和释放资源的流程:


当一个线程调用 acquire(int arg)方法获取独占资源时,会首先使用tryAcquire方法尝试获取资源,具体就是设置状态变量state的值,成功就直接返回,失败就将当前线程封装为类型Node.EXCLUSIVE的Node节点后插入到AQS阻塞队列的尾部,并调用LockSupport.park(this)方法挂起自己


 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }



当一个线程调用release方法时会尝试使用tryRelease操作释放资源,这里是设置状态变量state的值,然后调用LockSupport.unpark(thread)方法激活AQS队列里面被阻塞的一个线程(thread),被激活的线程使用tryAcquire尝试,看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下进行,否则还是会被放入AQS队列并被挂起

  public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}


细节:

AQS类没有提供可用的tryAcquire和tryRelease方法,这需要由具体的子类去实现,子类在实现这两个方法时根据具体场景使用CAS算法尝试修改state状态值,成功返回true,否则返回false,子类还需定义在调用acquire和release方法时state状态值的增减代表什么含义

共享方式获取和释放资源的流程:


线程调用acquireShared获取共享资源时,首先使用tryAcquireShared尝试获取资源,具体设置状态变量state的值,成功则直接返回,失败则将当前线程封装为Node.SHARED的Node节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)方法挂起自己


public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }



当一个线程调用releaseShared时会尝试使用tryReleaseShared操作释放资源,这里是设置状态变量state的值,然后使用LockSupport.unpark(thread)激活AQS队列里面被阻塞的一个线程(thread),被激活的线程则使用tryReleaseShared查看当前状态变量state的值是否能满足自己的需要,满足则该线程被激活,然后继续向下运行,否则还是会被放入AQS队列并被挂起

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    } 

细节:

AQS类没有提供可用的tryAcquireShared和tryReleaseShared方法,这两个方法需要由子类实现,子类在实现这两个方法时要根据具体场景使用CAS算法尝试修改state的状态值,成功返回true,否则返回false

基于AQS实现的锁除了需要重写上面的方法,还要重写isHeldExclusively方法来判断锁是否被当前线程独占还是被共享

其中不带Interruptibly关键字的方法的意思是不对中断进行响应,也就是线程在调用不带Interruptibly关键字的方法获取资源时或者获取资源失败被挂起时,其它线程中断了该线程,那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就是不对中断进行响应,忽略中断

独占锁模式

获取锁的过程:

释放锁过程:

acquire(int arg)

获取锁:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

逻辑:

// 该入队方法的返回值就是新创建的节点
private Node addWaiter(Node mode) {
        //基于当前线程,节点类型(Node.EXCLUSIVE)创建新的节点
        //由于这里是独占模式,因此节点类型就是Node.EXCLUSIVE
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        //这里为了提搞性能,首先执行一次快速入队操作,即直接尝试将新节点加入队尾
        if (pred != null) {
            node.prev = pred;
            //这里根据CAS的逻辑,即使并发操作也只能有一个线程成功并返回,其余的都要执行后面的入队操作。即enq()方法
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

入队操作:

//完整的入队操作
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //如果队列还没有初始化,则进行初始化,即创建一个空的头节点
            if (t == null) { 
                //同样是CAS,只有一个线程可以初始化头结点成功,其余的都要重复执行循环体
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //新创建的节点指向队列尾节点,毫无疑问并发情况下这里会有多个新创建的节点指向队列尾节点
                node.prev = t;
                //基于这一步的CAS,不管前一步有多少新节点都指向了尾节点,这一步只有一个能真正入队成功,其他的都必须重新执行循环体
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    //该循环体唯一退出的操作,就是入队成功(否则就要无限重试)
                    return t;
                }
            }
        }
    }


经过上面的操作,申请获取锁的线程已经成功加入了等待队列,接下来节点现在要做的就是挂起当前线程,等待被唤醒



// 该方法入参node就是刚入队的包含当前线程信息的节点
final boolean acquireQueued(final Node node, int arg) {
        //锁资源获取失败标记位
        boolean failed = true;
        try {
            //等待线程被中断标记位
            boolean interrupted = false;
            //这个循环体执行的时机包括新节点入队和队列中等待节点被唤醒两个地方
            for (;;) {
                //获取当前节点的前置节点
                final Node p = node.predecessor();
                //如果前置节点就是头结点,则尝试获取锁资源
                if (p == head && tryAcquire(arg)) {
                    //当前节点获得锁资源以后设置为头节点
                    //头结点就表示当前正占有锁资源的节点
                    setHead(node);
                    p.next = null; //帮助GC
                    //表示锁资源成功获取,因此把failed置为false
                    failed = false;
                    //返回中断标记,表示当前节点是被正常唤醒还是被中断唤醒
                    return interrupted;
                }
                如果没有获取锁成功,则进入挂起逻辑
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //获取锁失败处理逻辑
            if (failed)
                cancelAcquire(node);
        }
    }

挂起逻辑目前为止,只是根据当前线程,节点类型创建了一个节点并加入队列中,其他属性都是默认值

//node是当前线程的节点,pred是它的前置节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取前置节点的waitStatus
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //如果前置节点的waitStatus是Node.SIGNAL则返回true,然后会执行parkAndCheckInterrupt()方法进行挂起
            return true;
        if (ws > 0) {
            //由waitStatus的几个取值可以判断这里表示前置节点被取消
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //这里我们由当前节点的前置节点开始,一直向前找最近的一个没有被取消的节点
            //注,由于头结点head是通过new Node()创建,它的waitStatus为0,因此这里不会出现空指针问题,也就是说最多就是找到头节点上面的循环就退出了
            pred.next = node;
        } else {
            //根据waitStatus的取值限定,这里waitStatus的值只能是0或者PROPAGATE,那么我们把前置节点的waitStatus设为Node.SIGNAL然后重新进入该方法进行判断
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

上面的方法逻辑用来判断当前节点是否可以被挂起,也就是唤醒条件是否已经具备,即如果挂起了,那一定是可以由其他线程来唤醒的。该方法如果返回false,即挂起条件没有完备,那就会重新执行acquireQueued方法的循环体,进行重新判断,如果返回true,那就表示万事俱备,可以挂起了,就会进入parkAndCheckInterrupt()方法

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        //被唤醒之后,返回中断标记,即如果是正常唤醒则返回false,如果是由于中断醒来,就返回true
        return Thread.interrupted();
    }

acquireQueued方法的最后一步,finally模块。这里是针对锁资源获取失败以后做的一些善后工作


//传入的方法参数是当前获取锁资源失败的节点
private void cancelAcquire(Node node) {
        // 如果节点不存在则直接忽略
        if (node == null)
            return;
        
        node.thread = null;

        // 跳过所有已经取消的前置节点,跟上面的那段跳转逻辑类似
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        //这个是前置节点的后继节点,由于上面可能的跳节点的操作,所以这里可不一定就是当前节点,仔细想一下。^_^
        Node predNext = pred.next;

        //把当前节点waitStatus置为取消,这样别的节点在处理时就会跳过该节点
        node.waitStatus = Node.CANCELLED;
        //如果当前是尾节点,则直接删除,即出队
        //注:这里不用关心CAS失败,因为即使并发导致失败,该节点也已经被成功删除
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    //这里的判断逻辑很绕,具体就是如果当前节点的前置节点不是头节点且它后面的节点等待它唤醒(waitStatus小于0),
                    //再加上如果当前节点的后继节点没有被取消就把前置节点跟后置节点进行连接,相当于删除了当前节点
                    compareAndSetNext(pred, predNext, next);
            } else {
                //进入这里,要么当前节点的前置节点是头结点,要么前置节点的waitStatus是PROPAGATE,直接唤醒当前节点的后继节点
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

释放锁

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease()方法是子类实现的释放锁逻辑,如果成功,就判断等待队列中有没有需要被唤醒的节点(waitStatus为0表示没有需要被唤醒的节点)

唤醒操作

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            //把标记为设置为0,表示唤醒操作已经开始进行,提高并发环境下性能
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        //如果当前节点的后继节点为null,或者已经被取消
        if (s == null || s.waitStatus > 0) {
            s = null;
            //注意这个循环没有break,也就是说它是从后往前找,一直找到离当前节点最近的一个等待唤醒的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //执行唤醒操作
        if (s != null)
            LockSupport.unpark(s.thread);
    }



共享锁模式

获取锁的过程:

释放锁过程:

获取锁:

  public final void acquireShared(int arg) {
        //尝试获取共享锁,返回值小于0表示获取失败
        if (tryAcquireShared(arg) < 0)
            //执行获取锁失败以后的方法
            doAcquireShared(arg);
    }

tryAcquireShared()方法是留给子类去实现具体的获取锁逻辑

关于该方法的实现有两点需要特别说明

doAcquireShared()


  //参数不多说,就是传给acquireShared()的参数
    private void doAcquireShared(int arg) {
        //添加等待节点的方法跟独占锁一样,唯一区别就是节点类型变为了共享型,不再赘述
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //表示前面的节点已经获取到锁,自己会尝试获取锁
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //注意上面说的, 等于0表示不用唤醒后继节点,大于0需要
                    if (r >= 0) {
                        //这里是重点,获取到锁以后的唤醒操作,后面详细说
                        setHeadAndPropagate(node, r);
                        p.next = null;
                        //如果是因为中断醒来则设置中断标记位
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //挂起逻辑跟独占锁一样,不再赘述
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //获取失败的取消逻辑跟独占锁一样,不再赘述
            if (failed)
                cancelAcquire(node);
        }
    }

独占锁模式获取成功以后设置头结点然后返回中断状态,结束流程,而共享锁模式获取成功以后,调用了setHeadAndPropagate方法


 //两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; //记录当前头节点
        //设置新的头节点,即把当前获取到锁的节点设置为头节点
        //注:这里是获取到锁之后的操作,不需要并发控制
        setHead(node);
        //这里意思有两种情况是需要执行唤醒操作
        //1.propagate > 0 表示调用方指明了后继节点需要被唤醒
        //2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //如果当前节点的后继节点是共享类型获取没有后继节点,则进行唤醒
            //这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
            if (s == null || s.isShared())
                //后面详细说
                doReleaseShared();
        }
    }

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }


唤醒操作:


private void doReleaseShared() {
        for (;;) {
            //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
            //其实就是唤醒上面新获取到共享锁的节点的后继节点
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //表示后继节点需要被唤醒
                if (ws == Node.SIGNAL) {
                    //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;      
                    //执行唤醒操作      
                    unparkSuccessor(h);
                }
                //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                
            }
            //如果头结点没有发生变化,表示设置完成,退出循环
            //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
            if (h == head)                   
                break;
        }
    }

释放共享锁


public final boolean releaseShared(int arg) {
        //尝试释放共享锁
        if (tryReleaseShared(arg)) {
            //唤醒过程,详情见上面分析
            doReleaseShared();
            return true;
        }
        return false;
    }

独占锁相比,共享锁的主要特征在于当一个在等待队列中的共享节点成功获取到锁以后(它获取到的是共享锁),既然是共享,那它必须要依次唤醒后面所有可以跟它一起共享当前锁资源的节点.

毫无疑问,这些节点必须也是在等待共享锁(这是大前提,如果等待的是独占锁,那前面已经有一个共享节点获取锁了,它肯定是获取不到的)。当共享锁被释放的时候,可以用读写锁为例进行思考,当一个读锁被释放,此时不论是读锁还是写锁都是可以竞争资源的

上一篇下一篇

猜你喜欢

热点阅读