JVM&并发&多线程@花城鱼

《实战java高并发程序设计》笔记(三)

2020-02-18  本文已影响0人  MikeShine

写在前面

前1、2章中,我们对于 并发编程中一些基本的概念和原理有了一定的了解。
在第3章中,我们将学习一些 JDK 内部提供的使用的API 和框架。主要分为3个部分:同步控制的工具、线程池的支持、支持并发的容器


第三章 JDK并发包

第三章知识框架图

3.1.1 重入锁 ReentrantLock

重入锁使用 java.util.concurrent.locks.ReentrantLock 类实现。之所以叫重入是因为其允许 锁反复进入,这里的反复进入限于一个线程。

// lock 是一个 ReentrantLock 对象
lock.lock();
lock.lock();    
try{
      i++;    
}finally{
lock.unlock();
lock.unlock();
}

重入锁跟 synchronized 比较:
(1)原始构成

(4)公平锁

(5)绑定多个条件Condition
ReentrantLock 通过绑定多个条件可以实现精准的分组唤醒线程。而 Sync 只能随机唤醒一个或者全部唤醒。

下面看一下重入锁的重要方法:

下面我们看一下 ReentrantLock 可以实现的几个特殊场景

1. 等待可中断
就是说,在等待锁的同时,可以响应中断。通常情况下,这种机制被用于处理死锁。

package ConcurrencyTest;

import java.util.concurrent.locks.ReentrantLock;

/**
 * 测试重入锁中的响应中断功能
 */

public class LockInterruptilyTest {
    private static ReentrantLock lock = new ReentrantLock();

    private static class ReentrantLockThread implements Runnable{
        @Override
        public void run() {
            try{
                lock.lockInterruptibly();
                for (int i=0;i<3;i++){
                    System.out.println(Thread.currentThread().getName()+"得到了锁 i="+i);
                }
            }catch (InterruptedException e){
                System.out.println(Thread.currentThread().getName()+"被打断了");
            }finally {
                // 查询当前线程是否保持此锁
                if(lock.isHeldByCurrentThread()){
                    lock.unlock();
                }
            }
        }
    }

    public static void main(String args[]) {
        ReentrantLockThread test = new ReentrantLockThread();
        Thread t1 = new Thread(test,"t1");
        Thread t2 = new Thread(test,"t2");
        Thread t3 = new Thread(test,"t3");
        t1.start();
        t2.start();
        t3.start();
        t2.interrupt();
    }
}

运行结果

t1得到了锁 i=0
t1得到了锁 i=1
t1得到了锁 i=2
t2被打断了
t3得到了锁 i=0
t3得到了锁 i=1
t3得到了锁 i=2

2. 公平锁
只需要设置 ReentrantLock(True)即可。

ReentrantLock fairLock = new ReentrantLock(True);

下面看一个公平锁的例子

package ConcurrencyTest;

import java.util.concurrent.locks.ReentrantLock;

/**
 * 用 ReentrantLock 实现公平锁
 */
public class FairLockViaReentrantLock implements Runnable{
    private static ReentrantLock fairLock = new ReentrantLock(true);

    @Override
    public void run() {
        int i = 0;
        while (i<10){
            try{
                fairLock.lock();
                System.out.println(Thread.currentThread().getName()+"获得锁");
            }finally {
                fairLock.unlock();
            }
            i++;
        }
    }

    public static void main(String args[]){
        FairLockViaReentrantLock f1 = new FairLockViaReentrantLock();
        Thread t1 = new Thread(f1,"Thread_t1");
        Thread t2 = new Thread(f1,"Thread_t2");
        t1.start();t2.start();
    }
}

由于线程是由公平锁实现的,执行结果两个线程交替获得锁。

Thread_t1获得锁
Thread_t2获得锁
Thread_t1获得锁
Thread_t2获得锁

3. 绑定多个条件实现精确唤醒
在这里例子里面,就是通过3个Condition条件,来实现锁的精确唤醒顺序。比如这个 线程执行为 A--B--C 这样的 。

/**
 * 锁绑定多个条件Condition
 * 题目:多线程之间按顺序执行,实现A->B->C三个线程启动,要求如下: A打印5次,B打印10次,C打印15次,
 * 紧接着 A打印5次,B打印10次,C打印15次, . . . 循环执行10轮
 */
public class LockConditionDemo {
    public static void main(String[] args) {
        ShareResource shareResource = new ShareResource();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                shareResource.print5();
            }
        }, "A").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                shareResource.print10();
            }
        }, "B").start();

        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                shareResource.print15();
            }
        }, "C").start();
    }
}

/**
 * 共享资源类
 */
class ShareResource {
    // A:1 B:2 C:3
    private int num = 1;
    private Lock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition();
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();

    // 循环打印5次
    public void print5() {
        // 1、获取锁资源
        lock.lock();
        try {
            // 2、判断是否可以执行业务
            while (num != 1) {
                // 阻塞等待
                conditionA.await();
            }
            // 模拟业务执行
            for (int i = 0; i < 5; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + (i + 1));
            }
            // 3、通知其他线程,通过signal()方法唤醒线程
            num = 2;
            conditionB.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    // 循环打印10次
    public void print10() {
        // 1、获取锁资源
        lock.lock();
        try {
            // 2、判断是否可以执行业务
            while (num != 2) {
                conditionB.await();
            }
            // 模拟业务执行
            for (int i = 0; i < 10; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + (i + 1));
            }
            // 3、通知其他线程
            num = 3;
            conditionC.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    // 循环打印15次
    public void print15() {
        // 1、获取锁资源
        lock.lock();
        try {
            // 2、判断是否可以执行业务
            while (num != 3) {
                conditionC.await();
            }
            // 模拟业务执行
            for (int i = 0; i < 15; i++) {
                System.out.println(Thread.currentThread().getName() + "\t" + (i + 1));
            }
            // 3、通知其他线程
            num = 1;
            conditionA.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

执行结果

A   1
A   2
A   3
A   4
A   5
B   1
B   2
B   3
B   4
B   5
B   6
B   7
B   8
B   9
B   10
C   1
C   2
C   3
C   4
C   5
C   6
C   7
C   8
C   9
C   10
C   11
C   12
C   13
C   14
C   15
......// 后面循环10次


3.1.2 Condition 条件

Condition对象和 wait() & notify() 方法作用基本是类似的。但是 wait() 和 notiry() 是和 sync 关键字结合使用的,而 Condition 是跟重入锁关联的
通过 Lock 接口(重入锁就是实现了这个接口)的 Condition newCondition() 方法,生成一个与当前重入锁绑定的 Condition 实例。利用 Condition 独享,就可以让线程在合适的时间等待,或者通知线程继续执行。
Condition 接口提供的基本方法:

下面看一个例子:

package ConcurrencyTest;

import java.util.concurrent.locks.*;

/**
 * 测试Condition条件相关的等待和唤醒功能
 */
public class ConditionTest {
    private static ReentrantLock lock = new ReentrantLock();
    // 通过一个lock对象,生成 与 对象绑定的 Condition 对象
    private static Condition condition = lock.newCondition();

    public static class ConditionThread implements Runnable{
        @Override
        public void run() {
            try{
                lock.lock();
                condition.await();
                System.out.println("等待完成,线程继续执行");
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            finally {
                lock.unlock();
            }
        }
    }

    public static void main(String args[]) throws InterruptedException{
        ConditionThread conditionThread = new ConditionThread();
        Thread thread = new Thread(conditionThread,"t1");
        thread.start();
        Thread.sleep(2000);
        // 这里还是给 signal() 方法加锁了
        lock.lock();
        condition.signal();
        lock.unlock();
    }
}

3.1.3 信号量Semaphore:允许多个线程同时访问

下面我们看一个例子

package ConcurrencyTest;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 用来测试信号量
 */

public class SemaphoreTest {
   // 声明了包含3个许可的信号量
    private static Semaphore semaphore = new Semaphore(3); 

    private static class SemaphoreThread implements Runnable{
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"获取一个许可");
                Thread.sleep(1);
                System.out.println(Thread.currentThread().getName()+"完成,开始释放许可");
                semaphore.release();
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public static void main(String args[]){
            SemaphoreThread semaphoreThread = new SemaphoreThread();
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for(int i=0;i<12;i++){
                executorService.submit(semaphoreThread);
            }
        }
    }
}

运行结果会以3个线程为1组,来获取许可,最后释放许可。


3.1.4 ReadWriteLock 读写锁

实际上是在使用 ReentrantReadWriteLock 这个类。

书上给了一个例子,说明用读写分离锁的时候,读线程之间并行,写线程阻塞。而用重入锁,所有读写线程之间要相互等待,程序执行时间比较长。


3.1.5 倒计数器 CountDownLatch

下面看一个例子

package ConcurrencyTest;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 测试倒计数器 CountDownLatch
 */

public class CountDonwLatchTest implements Runnable {
    static final CountDownLatch end = new CountDownLatch(10);
    static final CountDonwLatchTest test = new CountDonwLatchTest();
    @Override
    public void run() {
        try {
            // 模拟火箭发射
            Thread.sleep(new Random().nextInt(10)*1000);
            System.out.println("检查完成");
            // 一个线程已经完成任务,计数器减一
            end.countDown(); 
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    public static void main(String args[]) throws InterruptedException{
        Executor exec = Executors.newFixedThreadPool(10);
        for(int i=0;i<10;i++){
            ((ExecutorService) exec).submit(test);
        }
        // 等待检查
        end.await();
        // 发射火箭
        System.out.println("发射");
        ((ExecutorService) exec).shutdown();
    }
}

这里注意,计数是针对线程数。每个线程执行完,运行 countDown() 方法,减一个计数。 await() 方法要等所有计数清零。


3.1.6 循环栅栏 CyclicBarrier

下面看一个例子:

package ConcurrencyTest;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 循环栅栏 CyclicBarrier 测试类
 */
public class CyclicBarrierTest {
    public static class Soldier implements Runnable {
        private String soldierName;
        private final CyclicBarrier cyclic;

        Soldier(CyclicBarrier cyclic, String soldierName) {
            this.cyclic = cyclic;
            this.soldierName = soldierName;
        }

        @Override
        public void run() {
            try {
                //等待其他士兵到齐
                cyclic.await();
                doWork();
                //等待所有士兵完成工作
                cyclic.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }

        void doWork() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(soldierName + " :任务完成");
        }
    }

    public static class BarrierRun implements Runnable {
        boolean flag;
        int N;

        public BarrierRun(boolean flag, int N) {
            this.flag = flag;
            this.N = N;
        }

        @Override
        public void run() {
            if (flag) {
                System.out.println("司令:[士兵" + N + "个,任务完成!]");
            } else {
                System.out.println("司令:[士兵" + N + "个,集合完毕!]");
                flag = true;
            }
        }
    }

    public static void main(String[] args) {
        final int N = 10;
        Thread[] allSoldier = new Thread[N];
        boolean flag = false;
        CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
        //设置屏障点,主要是为了执行这个方法
        System.out.println("集合队伍!");
        for (int i = 0; i < N; i++) {
            System.out.println("士兵" + i + " 报道!");
            allSoldier[i] = new Thread(new Soldier(cyclic, "士兵" + i));
            allSoldier[i].start();
        }
    }
}

输出如下。程序两次到达屏障点,每次到达都会调用 BarrierRun 的 run() 方法,由于 flag 的设置,两次输出不同。关于倒计时器和循环栅栏的区别

集合队伍!
士兵0 报道!
士兵1 报道!
士兵2 报道!
士兵3 报道!
士兵4 报道!
士兵5 报道!
士兵6 报道!
士兵7 报道!
士兵8 报道!
士兵9 报道!
司令:[士兵10个,集合完毕!]
士兵7 :任务完成
士兵9 :任务完成
士兵5 :任务完成
士兵6 :任务完成
士兵0 :任务完成
士兵4 :任务完成
士兵3 :任务完成
士兵8 :任务完成
士兵1 :任务完成
士兵2 :任务完成
司令:[士兵10个,任务完成!]

3.1.7 线程阻塞工具类:LockSupport

下面看一个例子


/**
 * LockSupport 线程阻塞工具类测试
 */

public class LockSupportTest {
    private static class LockSupportThread implements Runnable{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"线程阻塞");
            LockSupport.park();
            for(int i=0;i<5;i++){
                System.out.println(i);
            }
        }
    }

    public static void main(String args[]) throws InterruptedException{
        LockSupportThread lockSupportThread = new LockSupportThread();
        Thread thread = new Thread(lockSupportThread,"t1");
        thread.start();
        thread.sleep(1000);
        // 唤醒
        System.out.println("main唤醒阻塞线程");
        LockSupport.unpark(thread);
    }
}

t1线程阻塞
main唤醒阻塞线程
0
1
2
3
4

3.2.1 线程池

线程池的概念:

线程池产生的原因:


3.2.2 JDK 对线程池的支持

Executor 框架结构图
对 线程池 Executor 框架更详细的介绍
Executor 框架成员
  1. Executors 类是一个工厂类,用于配置线程池,主要有下面的工厂方法

注意上面的方法的返回类型。

接下来对上面提到的方法逐个解析:

public static void main(String args[]){
        TestThread testThread = new TestThread();     // testThread 是一个多线程类
        ExecutorService executorService = Executors.newFixedThreadPool(5); // 配置线程池
        for(int i=0;i<5;i++){
            executorService.submit(testThread);   // submit() 方法,用于向线程池提交任务。
        }
    }
 public static void main(String[] args) {
        CachedThreadPoolTask cachedThreadPoolTask = new CachedThreadPoolTask();
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            executorService.submit(cachedThreadPoolTask);
        }
    }
}

运行结果会创建10个线程。

/**
 * 用来测试 计划任务
 * newScheduledThreadPool 线程池
 */

public class ScheduledExecutorServiceDemo {
    public static void main(String args[]){
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(System.currentTimeMillis()/1000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        },0,2,TimeUnit.SECONDS);
    }
}

执行结果是以3s的间隔调度任务。如果是 scheduleAtFixedRate() 方法,任务时间设置为1s,执行间隔设置为2s,那么任务调度间隔最后就是2s;5s,3s,就是5s,因为 ScheduledExecutorService 不会出现任务堆叠,就是说任务执行周期太短,就会在任务结束后立即调用下一个任务。


3.2.3 核心线程池的内部实现

上一个小节中的各种创建线程池的工厂方法,实际上,其内部都使用了 ThreadPoolExecutor 类。因此,我们需要了解一下 ThreaPoolExecutor 类最重要的构造函数

public ThreadPoolExecutor(
            int corePoolSize,
            int maximumPoolSize,
            long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory,
            RejectedExecutionHandler handler
    )

参数含义如下:

上面的参数都很直观,但是对于 workQueue 和 handler 我们需要进一步了解。
workQueue 是一个 BlockingQueue 对象,用于存放 Runnable 对象。在 ThreadPoolExecutor 的构造函数中,可以使用下面几种 BlockingQueue 接口:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

3.2.4 拒绝策略

刚刚我们提到过,当二次提交超过 maxPoolSize 时候,需要执行拒绝策略。这里的拒绝策略有4种:


3.2.5 ThreadFactory 接口:自定义线程创建

线程池的产生时为了避免频繁的创建销毁线程,但是肯定还是会创建线程的,最开始的那些线程从哪里来呢,就是 ThreadFactory 接口,这个接口只有一个创建线程的方法

Thread newThread(Runnable r);

3.2.3 小节中提到过,ThreadPoolExecutor 的构造方法中,倒数第二个参数就是 ThreadFactory 对象,通常情况下,使用默认的,不用指定。
书上给出了一个重写 ThreadFactory 对象的例子,说明通过自定义 ThreadFactory,我们可以灵活地创建线程池中的线程。

public static void main(String args[]) throws InterruptedException{
       ThreadFactoryTask threadFactoryTask = new ThreadFactoryTask();
       ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                System.out.println("creat"+ thread);
                return thread;
            }
        });
        for(int i=0;i<5;i++){
            es.submit(threadFactoryTask);
        }
        Thread.sleep(2000);
    }

由于这里将线程池中的线程都设置为 后台线程,所以在 main 线程结束后,整个程序结束。


3.2.6 扩展线程池

ThreadPoolExecutor 其实也是一个可以扩展的线程池。其提供了 beforeExecute()、afterExecute() 、terminated() 三个方法来对线程池进行监控。


/**
 * 测试 ThreadPoolExecutor 的可扩展性
 */

public class ThreadPoolExecutorTest {
    public static class ThreadPoolExecutorTask implements Runnable{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+":"+System.currentTimeMillis());
        }
    }

    public static void main(String args[]) throws InterruptedException{
        ExecutorService es = new ThreadPoolExecutor(5,5,
                0L,TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>()){
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行" + Thread.currentThread().getName());
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行结束" + Thread.currentThread().getName());
            }

            @Override
            protected void terminated() {
                System.out.println("线程退出");
            }
        };
        ThreadPoolExecutorTask threadPoolExecutorTask = new ThreadPoolExecutorTask();
        for(int i=0;i<5;i++){
            es.submit(threadPoolExecutorTask);
            Thread.sleep(1000);
        }
        es.shutdown();
    }
}

执行结果

准备执行pool-1-thread-1
pool-1-thread-1:1551530510144
执行结束pool-1-thread-1
准备执行pool-1-thread-2
pool-1-thread-2:1551530510160
执行结束pool-1-thread-2
准备执行pool-1-thread-4
pool-1-thread-4:1551530510160
执行结束pool-1-thread-4
准备执行pool-1-thread-5
准备执行pool-1-thread-3
pool-1-thread-3:1551530510160
执行结束pool-1-thread-3
pool-1-thread-5:1551530510160
执行结束pool-1-thread-5
线程退出

3.2.9 Fork/Join 框架

最后书上给出了一个求和的例子,通过 ForkJoinPool 和 RecuresiveTask,将很大数的求和转换成为 若干个小数求和的子任务,最终获得总共的结果。


3.3.1 并发集合简介

除了上述之外, Vector 也是线程安全的。Collections 工具类也可以将集合包装成线程安全的。


3.3.2 ConcurrentHashMap

对于 HashMap 和 ConcurrentHashMap ,这是一个较为重点的内容。
ConcurrentHashMap较为详细的解释


3.3.3 有关List的线程安全

跟上面提到过的一样,我们可以用 Collections.SynchronizedList() 方法来包装 list。获得一个线程安全的 List。

List<String> l = Collections.synchronizedList(new LinkedList<String>());

3.3.4 ConcurrentLinkedQueue

高效的并发队列(只需要记住是性能最好的并发环境下的队列即可),基于链表实现,可以看成是线程安全的 LinkedList。


3.3.5 CopyOnWriteArrayList

public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

3.3.6 BlockingQueue


3.3.7 随机数据结构:跳表 SkipList

跳表结构示意

比如在上图这个结构中,查找元素 7。 顶层元素最少,在顶层找,没有;下一层;找到8,;下一层,确定6--8之间。即可以快速查找。

上一篇下一篇

猜你喜欢

热点阅读