9 JUC之AQS

2018-10-26  本文已影响0人  十丈_红尘
1️⃣ 简介
1 AQS概念 :

AbstractQueuedSynchronizer,它提供了一个FIFO队列,可以看做是一个可以用来实现锁以及其他需要同步功能的框架。这里简称该类为AQS。AQS的使用依靠继承来完成,子类通过继承自AQS并实现所需的方法来管理同步状态。例如ReentrantLock,CountDownLatch等.

2 底层数据结构 :
AQS底层是使用的双向链表实现的如下图所示

双向链表是队列的一个实现,因此我们也可以把他当成一个队列;其中Sync queue是同步队列,包含head节点和tail节点head节点主要用于后续的调度;Condition queue是一个单项链表,这个结构不是必须的当需要使用到这个结构的时候才会生成,且可能会有多个;

3 AQS的设计 :

① 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架;
② 利用了一个int类型来表示状态(在AQS类中有一个state的成员变量);
③ 使用方法是继承,AQS是基于模板方法使用者需要在使用的时候先继承,并复写其中的方法;
④ 子类通过继承并通过实现它的方法来管理其状态(通过acquire与release来操作状态);
⑤ 可以同时实现排它锁和共享锁模式(独占与共享);

4 AQS实现的思路

首先AQS内部维护了一个CLH队列来管理锁,线程会首先尝试获取锁,如果失败会将当前线程以及等待状态等信息包装成一个node加入到Sync queue同步队列中,接着会不断的循环尝试获取锁,它的条件是当前节点head的直接后继才会尝试如果失败就会阻塞自己直到自己被唤醒,而当持有锁的线程释放锁的时候会唤醒队列中的后继线程,基于这些基础的设计和思路JDK提供了需要基于AQS的子类;

5 AQS的常用组件

① CountDownLatch : 是一个闭锁,通过一个计数来保证线程是否需要一直阻塞;
② Semaphore : 它能控制同一时间并发线程的数目;
③ CyclicBarrier : 它和CountDownLatch相似都能阻塞进程
④ ReentrantLock : 可重入互斥锁Lock
⑤ Condition : 提供了类似的Object的监视器方法,与Lock配合可以实现等待/通知模式.
⑥ FutureTask : 可用于异步获取执行结果或取消执行任务的场景。


2️⃣J.U.C之AQS-CountDownLatch
1 CountDownLatch图示
2 CountDownLatch简介

CountDownLatch是一个同步辅助类,通过它可以完成类似于阻塞当前线程的功能(换句话说就是一个线程或者多个线程一直等待直到其他线程操作完成),CountDownLatch用了一个给定的计数器来进行初始化,该计数器的操作是原子操作(同时只能有一个线程去操作该计数器),调用该类await()方法的线程会一直处于阻塞状态,直到其他线程调用countDown()这个方法使计数器的值变成0,每次调用countDown()的时候计数器会减1,当计数器的值变成0的时候,所有因调用await()方法而处于等待状态的线程就会继续往下执行,这种操作只会出现一次因为计数器不能被重置,如果大家在实际开发过程需要一个可以重置计数器的版本可以考虑使用CyclicBarrier来实现;

3 CountDownLatch的使用场景

在某些业务场景中,程序执行需要等待某个条件完成后才能继续执行后续的操作,典型的应用比如 : 并行计算

4 CountDownLatch代码演示
// CountDownLatch 1
@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
// CountDownLatch 2
@Slf4j
public class CountDownLatchExample2 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await(10, TimeUnit.MILLISECONDS);
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
    }
}

3️⃣J.U.C之AQS-Semaphore
1 Semaphore图示
2 Semaphore简介

Semaphore又称信号量,它可以控制并发访问的线程个数,在操作系统中信号量是一个非常重要的概念,在进程控制方面有很重要的应用,java并发库中的Semaphore可以非常轻松的完成类似于操作系统中信号量的控制,Semaphore可以控制某个资源可被同时访问的个数,与CountDownLatch的使用有一些类似也是提供了两个核心的方法(acquire()与release()),acquire方法是获取一个许可如果没有则会等待,release是在操作完成后释放一个许可出来,Semaphore维护了当前访问的个数通过提供同步机制来控制同时访问的个数,Semaphore可以实现有限大小的链表;

3 Semaphore的使用场景

Semaphore常用与仅能提供有限访问的资源,比如项目中使用的数据库(数据库所支持的最大连接数)

4 Semaphore代码演示
// Semaphore 1
@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(); // 获取一个许可
                    test(threadNum);
                    semaphore.release(); // 释放一个许可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
// Semaphore 2
@Slf4j
public class SemaphoreExample2 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(3); // 获取多个许可
                    test(threadNum);
                    semaphore.release(3); // 释放多个许可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
// Semaphore 3
@Slf4j
public class SemaphoreExample3 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire()) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
// Semaphore 4
@Slf4j
public class SemaphoreExample4 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

4️⃣J.U.C之AQS-CyclicBarrier
1 CyclicBarrier图示
2 CyclicBarrier简介

CyclicBarrier也是一个同步辅助类,它允许一组线程相互等待,直到到达某一个公共的屏障点;通过它可以完成多个线程之间相互等待只有当每一个想成都准备就绪后才能各自继续往下执行后续的操作;它与CountDownLatch有一些相似的地方都是通过计数器来实现的,当某个方法调用了await()方法之后该线程就进入了等待状态,而且计数器执行的是加一的操作,当计数器的值达到了我们所设置的初始值的时候因为调用await()方法而进入等待状态的线程会被唤醒继续执行他们后续的操作,由于CyclicBarrier在释放等待线程后可重用我们又称之为循环屏障;

3 CyclicBarrier使用场景

CyclicBarrier的使用场景与CountDownLatch比较相似,它可以用于多线程计算数据,最后合并计算结果的应用场景,比如: excel表格的总数计算;

4 CyclicBarrier与CountDownLatch的区别

① CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以重置循环使用;
② CountDownLatch主要是实现一个或N个线程需要等待其他线程完成某项操作之后才能继续往下执行,它描述的是一个或N个线程等待其他线程的关系;CyclicBarrier实现的是多个线程之间相互等待,直到所有的线程都满足了条件之后,才能继续执行后续的操作,它描述的是各个线程内部相互等待的关系;

5 CyclicBarrier代码演示
// CyclicBarrier 1
@Slf4j
public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}
// CyclicBarrier 2
@Slf4j
public class CyclicBarrierExample2 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        try {
            barrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.warn("BarrierException", e);
        }
        log.info("{} continue", threadNum);
    }
}
// CyclicBarrier 3
@Slf4j
public class CyclicBarrierExample3 {

    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        log.info("callback is running");
    });

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

5️⃣J.U.C之AQS-ReentrantLock与锁
1 Java中的锁

① JUC中提供的锁 : JUC中核心的锁就是ReentrantLock,ReentrantLock既然属于锁它的核心也是lock与unLock,
② synchronized修饰的锁

2 ReentrantLock与synchronized的区别

① 可重入性 : ReentrantLock从字面上理解就是可重入锁,其实synchronized关键字修饰的锁也是可以重入的;在这个点上区别不大,他们都是线程进入一次锁的计数器就自增1,所以需要等待锁的计数器值为0时才能释放锁,
② 锁的实现 : synchronized关键字是依赖于JVM来实现的,而ReentrantLock是JDK实现的,他们之间的本质区别类似于操作系统来实现与自己写代码实现;
③ 性能的区别 : 在synchronized关键字优化以前,它的性能比ReentrantLock差了很多,但是自从synchronized引入了偏向锁 轻量级锁(自旋锁)后他们两者的性能就差不多了,在两种方法都可用的情况下建议使用synchronized来实现,因为它的写法更容易;
④ 功能的区别 : 第一个方面synchronized的使用方便简洁,并且是由编译器来保证加锁与释放的,ReentrantLock需要手工声明加锁和释放锁,为了避免忘记释放锁需要在faily中添加释放的操作;第二个方面是锁的细粒度与灵活度很明显ReentrantLock会由于synchronized;

3 ReentrantLock独有的功能 :

① 可以指定公平锁还是非公平锁,而synchronized只能是非公平锁;
② 提供了一个Condition的类,可以分组唤醒需要唤醒的线程,而synchronized是要么随机唤醒一个要么就是全部唤醒;
③ 提供了能够中断等待锁的线程的机制(lock.lockInterruptibly()来实现这个机制),ReentrantLock实现是一种自旋锁,它通过循环调用CAS来实现加锁,这也是它性能比较好的原因避免线程阻塞进入内核态;

4 synchronized代码演示
// lock 1
@Slf4j
@ThreadSafe
public class LockExample1 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static int count = 0;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private synchronized static void add() {
        count++;
    }
}
5 ReentrantLock代码演示
// lock 2
@Slf4j
@ThreadSafe
public class LockExample2 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static int count = 0;

    private final static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
}

6 ReentrantReadWriteLock

ReentrantReadWriteLock是在没有任何读写锁的时候才可以取得写入锁,它里边有两个锁一个是读锁一个是写锁,这也是这个类最核心的要求同时也是我们使用它的时候需要注意的点;

    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;

它可以用于实现了悲观读取即如果我们执行过程中进行读取时,经常用另一个写入的操作;为了保持通过这个时候我们就可以使用ReentrantReadWriteLock;如果读取较多写入较少的情况下,使用ReentrantReadWriteLock可能会使写入操作遭遇饥饿(也就是说写入线程没有办法竞争到锁定而一直处于等待状态)

7 ReentrantReadWriteLock代码演示
@Slf4j
public class LockExample3 {

    private final Map<String, Data> map = new TreeMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    class Data { }
}
8 StampedLock

StampedLock控制所有三种模式分别是写 读 乐观读;StampedLock的状态是由版本和模式两个部分组成,锁获取方法返回的是一个数字作为票据也就是state,它用相应的锁状态来表示并控制相关的访问,数字0表示没有写锁被授权访问;

在读锁中分为悲观锁和乐观锁所谓乐观读也就是说读的操作很多写的操作很少的情况下,我们可以认为写入和读取同时发生的几率很小;因此不悲观的时候读取锁定,程序可以查取查看读取资料以后是否遭到写入执行的变更在采取后续的措施,这个小改进可以大幅度提升程序的吞吐量;

9 StampedLock实例代码
// 官方实例代码
public class LockExample4 {

    class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();

        void move(double deltaX, double deltaY) { // an exclusively locked method
            long stamp = sl.writeLock();
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }

        //下面看看乐观读锁案例
        double distanceFromOrigin() { // A read-only method
            long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
            double currentX = x, currentY = y;  //将两个字段读入本地局部变量
            if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
                stamp = sl.readLock();  //如果没有,我们再次获得一个读悲观锁
                try {
                    currentX = x; // 将两个字段读入本地局部变量
                    currentY = y; // 将两个字段读入本地局部变量
                } finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }

        //下面是悲观读锁案例
        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
                    long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
                    if (ws != 0L) { //这是确认转为写锁是否成功
                        stamp = ws; //如果成功 替换票据
                        x = newX; //进行状态改变
                        y = newY;  //进行状态改变
                        break;
                    } else { //如果不能成功转换为写锁
                        sl.unlockRead(stamp);  //我们显式释放读锁
                        stamp = sl.writeLock();  //显式直接进行写锁 然后再通过循环再试
                    }
                }
            } finally {
                sl.unlock(stamp); //释放读锁或写锁
            }
        }
    }
}
@Slf4j
@ThreadSafe
public class LockExample5 {

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static int count = 0;

    private final static StampedLock lock = new StampedLock();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        long stamp = lock.writeLock();
        try {
            count++;
        } finally {
            lock.unlock(stamp);
        }
    }
}

10 Condition
@Slf4j
public class LockExample6 {

    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock();
        Condition condition = reentrantLock.newCondition();

        new Thread(() -> {
            try {
                reentrantLock.lock();
                log.info("wait signal"); // 1
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("get signal"); // 4
            reentrantLock.unlock();
        }).start();

        new Thread(() -> {
            reentrantLock.lock();
            log.info("get lock"); // 2
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signalAll();
            log.info("send signal ~ "); // 3
            reentrantLock.unlock();
        }).start();
    }
}
上一篇下一篇

猜你喜欢

热点阅读