廖雪峰Java读书笔记(六)--多线程(或称并发)

2020-09-01  本文已影响0人  拜仁的月饼

1. 多线程基础

首先要明白一些概念:

Java用一个主线程来执行main()方法,在main()方法内部我们又可以启动多个线程。

2. 创建新线程

要创建一个新线程非常容易,我们需要实例化一个Thread实例,然后调用它的start()方法即可。

package MultiThr;

public class CreateThread {
    public static void main(String[] args) {
        Thread t = new Thread();
        t.start(); // 启动新线程
    }
}

但这个线程什么都没有做。如果要创建一个做事的线程,应该这样做。

方法一:

来自于《Java核心技术》,其实是廖雪峰的方法二,但这个用到了lambda表达式。

  1. 实现Runnable接口,并实现其中的run方法。Runnable接口的源代码如下 :
public interface Runnable{
    void run();
}

这是个函数式接口,所以可以用lambda表达式来实现之:

Runnable r = () -> { /*在此处写下要执行的代码*/ };
  1. 构建一个Thread对象,并传入刚才实现的Runnable接口。
Thread t = new Thread(r);
  1. 调用start()方法:
t.start();
  1. 注:在Runnable的实现中要catch (InterruptedException e)
// 廖雪峰给出的例子
public class Main{
    public static void main(String[] args){
        Thread t = new Thread(new MyRunnable());
        t.start();
    }

class MyRunnable implements Runnable{
        @Override
        public void run(){
            System.out.println("start new thread");
        }
    }
}

如果用《Java核心技术》的方法改写上述代码,是:

public class Main{
    public static void main(String[] args){
        Thread t = new Thread(new Runnable() ->{ // 这里用到了lambda表达式方法
            try{
                    System.out.println("start new thread");
                } catch (InterruptedException e){  }
            }
        );
        t.start();
    }
}

方法二:

廖雪峰的方法一,《Java核心技术》中的方法二。

操作:从Thread继承一个类出来,并@Override其中的run()方法。

示例:

// 将上面的例子用此方法改写如下:
public class Main{
    public static void main(String[] args){
        Thread t = new MyThread();
        t.start();
    }
}

class MyThread extends Thread{
    @Override
    public void run(){
        /*task codes*/
    }
}

小结:

3. 线程状态

线程有六态,分别是:

当线程启动后,它可以在RunnableBlockedWaitingTimed Waiting这几个状态之间切换,直到最后变成Terminated状态,线程终止。

注:NewTerminated是两头,其他的是中间。

线程终止的原因有:

  • 线程正常终止:run()方法执行到return语句返回;
  • 线程意外终止:run()方法因为未捕获的异常导致线程终止;
  • 对某个线程的Thread实例调用stop()方法强制终止(强烈不推荐使用)。

注:

4. 线程属性

廖雪峰部分讲到了两个:

在《Java核心技术》中还有一个,暂时不知如何翻译,因为我读的是英文版:

4.1 中断线程

有两种情况会让线程中断:

以下方法可以检查线程是否设置了中断状态,首先我们可以调用Thread.currentThread()方法来获取当前线程,然后调用isInterrupted()检查是否设置了interrupted()状态。但如果线程被锁定便不能检查中断状态了。这就是InterruptedException的来源。

在catch到InterruptedException之后,可以检查一下Thread.currentThread().interrupt();也可以在方法之前就预先抛出Exception,像这样:

public void run() throws InterruptedException

4.2 守护线程

Java程序入口就是由JVM启动main线程,main线程又可以启动其他线程。当所有线程都运行结束时,JVM退出,进程结束。如果有一个线程没有退出,JVM进程就不会退出。所以,必须保证所有线程都能及时结束。

注:也就是说,一切方法都要在main()方法中执行。

守护线程是指为其他线程服务的线程。在JVM中,所有非守护线程都执行完毕后,无论有没有守护线程,虚拟机都会自动退出。

注:守护线程除了服务其他线程以外没有其他的作用。(《Java核心技术》)原文:A daemon is simply a thread that has no other role in life than to serve others.

设置守护线程的方法:

t.setDaemon(true)

4.3 为线程取名

可以给线程命名:

Thread t = new Thread(runnable);
t.setName("abc"); # 使用setName("name")方法

4.4 解决未捕获的异常问题

线程也可以被未捕获的异常终止,然后线程死亡。这时候就需要处理未捕获的异常。可以实现Thread.UncaughtExceptionHandler接口来处理。如下:

void unchangedException(Thread t, Throwable e)

可以以线程中设置一个setUncaughtExceptionHandler方法,也可以设置静态方法setDefaultUncaughtExceptionHandler

ThreadGroup对象暂时不是很懂,在《Core Java》P747-748中。就暂时先翻译一下:

ThreadGroup类实现了Thread.UncaughtExceptionHandler接口。其中的uncaughtException方法进行以下操作:

  1. 如果线程组有父线程,那么会调用uncaughtException()方法;
  2. 否则,如果Thread.getDefaultUncaughtExceptionHandler方法返回一个非null的解决方案,便会调用uncaughtException方法;
  3. 如果ThrowableThreadDeath的实例,那么什么也不会发生。
  4. 线程的名字及Throwable堆栈追踪会被输出 。

5. 线程同步(Synchronization)

前面说过,同一进程内的多个线程共享数据,这样会导致竞争情况(race condition)。也就是,当多个线程同时运行时, 线程并不是非常有礼貌地排队运行,如果不加干预,就是你运行一会它运行一会。如果是两个线程同时运行,并不是两个各运行相等的时间。要注意。

廖老师的例子:

// https://www.liaoxuefeng.com/wiki/1252599548343744/1306580844806178
public class Main {
    public static void main(String[] args) throws Exception {
        var add = new AddThread(); // 线程一
        var dec = new DecThread(); // 线程二
        // 两个线程并不是“先来后到”式运行的
        add.start();
        dec.start();
        add.join();
        dec.join();
        System.out.println(Counter.count); // 最后的结果不一定是0
    }
}

class Counter {
    public static int count = 0;
}

class AddThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) { Counter.count += 1; }
    }
}

class DecThread extends Thread {
    public void run() {
        for (int i=0; i<10000; i++) { Counter.count -= 1; }
    }
}

很可能的运行模式如下图:


从廖老师网站上截图下来

那么,如果要让线程之间可以礼让地运行,遵循“先来后到”的顺序,怎么办?就像这样:


从廖老师网站截图下来

从图中可以看出,为保证代码可以“先来后到”地运行,需要通过lock(加锁)unlock(解锁)操作实现。通过加锁与解锁的操作就可以保证一个线程执行期间不会有其他的线程进入此指令区间。即使在执行期线程被操作系统中断执行,其他线程也会因为无法获得锁导致无法进入此指令区间。只有执行线程将锁释放后,其他线程才有机会获得锁并执行。在专业术语中,此操作叫做代码的原子性(atomic)

有锁与无锁的区别(图片来自《Java核心技术》)

有两种方法可以实现:ReetrantLock类型与synchronized关键字。《Java核心技术》先讲的是前者,我看书有点懵逼,但廖老师先讲的是后者,相对比较明白一些。

由上面可以得知,保证一段代码的原子性,可以通过加锁与解锁的操作来实现。不过,在《Java核心技术》中作者也承认:

The Lock and Condition interfaces give programmers a high degree of control over locking. However, in most situations, you don't need that control -- you can use a mechanism that is built into the Java language.

其实,“a mechanism that is built into the Java language”就是我们要说的synchronized关键字,在术语中称为intrinsicLock

两种方法使用:

第一种使用方式表示用lock实例作为锁,两个线程在执行各自的synchronized(Counter.lock) { ... }代码块时,必须先获得锁,才能进入代码块进行。执行结束后,在synchronized语句块结束会自动释放锁。第二种方法也不用写unlock。 但是,它的缺点是带来了性能下降。因为synchronized代码块无法并发执行。此外,加锁和解锁需要消耗一定的时间,所以,synchronized会降低程序的执行效率。(廖雪峰语)

如何使用synchronized关键字锁定对象呢?

注意:

JVM规范定义了几种原子操作。不需要同步:

如果一个类被设计为允许多线程正确访问,我们就说这个类就是“线程安全”的(thread-safe)。Java标准库的java.lang.StringBuffer也是线程安全的。

如果是第二种方法,可以把整个方法变为同步代码块,锁住的对象是this

如果锁住的是static方法,那么锁住的是class.Class对象本身。

5.1 死锁

首先我们要明白,Java线程锁是可重入的锁(廖雪峰语)。对同一个线程,锁可以重复获得,即JVM允许线程重复地获取同一个锁,这就是可重入锁。例如:

public class Counter {
    private int count = 0;

    public synchronized void add(int n) { // add方法会获取一个锁
        if (n < 0) {
            dec(-n);  // 同时dec方法也会获得锁
        // JVM同时允许add()与dec()获得锁
        } else {
            count += n;
        }
    }

    public synchronized void dec(int n) {
        count += n;
    }
}

那么死锁是怎么发生的呢?通俗地说,一个已经获取锁的对象还要再获取另一个锁,但另一个锁已经被其他对象把持,死锁就发生了。(我自己说的)例如:(廖雪峰老师的例子)

public void add(int m) {
    synchronized(lockA) { // 获得lockA的锁
        this.value += m;
        synchronized(lockB) { // 获得lockB的锁
            this.another += m;
        } // 释放lockB的锁
    } // 释放lockA的锁
}

public void dec(int m) {
    synchronized(lockB) { // 获得lockB的锁
        this.another -= m;
        synchronized(lockA) { // 获得lockA的锁
            this.value -= m;
        } // 释放lockA的锁
    } // 释放lockB的锁
}

(用廖老师的方法)
分析如上例子:线程1和线程2分别执行add()dec()时:

然后顺序执行:

于是死锁(Deadlocks)就发生了。死锁一旦形成就只能强制结束进程。避免死锁的方法是严格按照线程获取锁的顺序来写!

一旦死锁发生,那么可以按Ctrl + \来查看所有线程。每个线程都有追踪,告诉你在哪里锁住了。

5.2 使用wait与notify

Java中synchronized解决了多线程竞争的问题,但并不解决多线程协调的问题。先看一个例子:

package MultiThr;
import java.util.*;

public class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s){
        this.queue.add(s);
    }

    public synchronized String getTask(){
        while (queue.isEmpty()){} // 实际上while循环不会停下来
        return queue.remove();
    }
}

理论上,如果任务队列为空,就等待,直到线程里有一个任务就退出。但事实上不是这样的:实际运行中,因为线程在执行while()循环时,已经在getTask()入口处获得了this锁,导致其他线程无法调用getTask()方法。最后的结果是getTask()陷入死循环。如何修改呢?

package MultiThr;
import java.util.*;

public class TaskQueue {
    Queue<String> queue = new LinkedList<>();

    public synchronized void addTask(String s){
        this.queue.add(s);
    }

    public synchronized String getTask() throws InterruptedException { // 抛出异常必须
        while (queue.isEmpty()){
            // 释放this锁
            this.wait();
        }
        return queue.remove();
    }
}

当一个线程在this.wait()等待时,它就会释放this锁,从而使得其他线程能够在addTask()方法上获得this锁。当wait()方法调用时会释放线程锁,返回后又重新获得锁。

当我们用wait()方法让线程进入等待状态后,有什么方法可以重新唤起线程吗?notify()方法,可以唤醒一个正在this锁等待的线程。方法如下:

public synchronized void addTask(String s) {
    this.queue.add(s);
    this.notify(); // 唤醒在this锁等待的线程
}

5.3 再谈ReentrantLock

由于《Java核心技术》一上来就讲到了ReentrantLock,我有点不懂。现在学完之后再回来看ReentrantLock,似乎可以懂了。

首先,ReentrantLockjava.uitl.concurrent.locks,属于并发编程。

上面的Counter类例子:

public class Counter {
    private int count;

    public void add(int n) {
        synchronized(this) {
            count += n;
        }
    }
}

用ReentrantLock改造一下是(廖老师例子):

import java.util.concurrent.locks.*;

public class Counter{
    private int count;
    private final Lock lock = new ReentrantLock(); // 在方法外围定义一个锁

    public void add(int n){
        lock.lock(); // 方法最一开始就加锁
        try{ // 要用try语句
                count += n;
            } finally{  lock.unlock();  } // 一定要释放锁
    }
}

注1:(《Java核心技术》语) It is critically important that the unlock operation is enclosed in a finally clause. (译:在finally语句中使用unlock语句释放锁)
注2:(《Java核心技术》语) You cannot use the try-with-resources statement. (译:在ReentrantLock中,不能使用try带括号)

ReentrantLock也是可重入锁,也就是说一个线程可以多次获取同一个锁,得锁者运行。

前面已经说到用wait()方法和notifyAll()方法实现多线程协调。那么在ReentrantLock中有何方法?Condition类中提供的await()、signal()、signalAll()可以实现同样的功能。

手撸一遍廖老师的代码:

import java.util.concurrent.locks.*;

public class TaskQueue{
    private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private Queue<String> queue = new LinkedList<>();

    public void addTask(String S){
        lock.lock();
        try{
            queue.add(s);
            condition.signalAll(); // notifyAll()
        }finally{
            lock.unlock();
        }
    }

    public void getTask(){
        lock.lock();
        try{
            while (queue.isEmpty()){
                condition.await();
            }finally{  lock.unlock();  }
        return queue.remove();
        }
    }
}

5.4 读写锁

不论是synchronized()方法也好,还是ReentrantLock也罢,都只能允许一个线程进行读写,如果不写入的话,其他线程读取都困难,因为没有获取锁。但我们想要的效果是允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待。换句话说,如果没人写,其他的都可以读;如果写了,其他的就不可读了。这个问题可以用ReadWriteLock解决。

使用ReadWriteLock时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改。比如论坛的帖子。

但它有一个问题:既某个线程要写的时候,需要释放读锁才能写。于是悲观锁发生了。

示例:

package MultiThr;
import java.util.concurrent.locks.*;
import java.util.*;

public class CounterRW {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock rLock = rwLock.readLock();
    private final Lock wLock = rwLock.writeLock();
    private int[] counts = new int[10];

    // 把读写操作分别用读锁和写锁来加锁,在读取时,多个线程可以同时获得读锁,这样就大大提高了并发读的执行效率。
    public void inc(int index){
        wLock.lock(); // 写锁
        try{
            counts[index] += 1;
        }finally{
            wLock.unlock();
        }
    }

    public int[] get(){
        rLock.lock(); // 读锁
        try{
            return Arrays.copyOf(counts, counts.length);
        }finally{ rLock.unlock(); }
    }
}

5.5 悲观锁与乐观锁

引用一篇技术博客: Java并发问题--乐观锁与悲观锁以及乐观锁的一种实现方式-CAS

先来看廖雪峰老师的简版定义:

再来看看技术博客中如何定义:

5.6 StampedLock

这是Java 8开始引进的一种读写锁,读的过程也允许获取写锁后写入。但需要一些代码判断是否有写入。例:

package MultiThr;
import java.util.*;
import java.util.concurrent.locks.*;

public class Point {
    public final StampedLock stampedLock = new StampedLock();

    private double x;
    private double y;

    public void move(double deltaX, double deltaY){
        long stamp = stampedLock.writeLock(); // 获取写锁
        try{
            x += deltaX;
            y += deltaY;
        }finally{
            stampedLock.unlockWrite(stamp); // 释放写锁
        }
    }

    public double distanceFromOrigin(){
        long stamp = stampedLock.tryOptimisticRead();
        double currentX = x;
        double currentY = y;

        if (!stampedLock.validate(stamp)){ // 检查是否有其他写锁发生
            stampedLock.readLock(); // 这是个悲观锁
            try{
                currentX = x;
                currentY = y;
            } finally{
                stampedLock.unlockRead(stamp);
            }
        }

        return Math.sqrt(currentX * currentX + currentY + currentY);
    }
}

5.7 Concurrent集合与Atomic操作以及volatile关键字

首先应明白何为“线程安全”。

线程安全就是多线程访问时,采用了加锁机制,当一个线程访问该类的某个数据时,进行保护,其他线程不能进行访问直到该线程读取完,其他线程才可使用。
引用地址

Concurrent集合:由于默认类并非线程安全,所以调用时为了实现线程安全可以加锁。但Java的并发机制已经为我们写好了,即Concurrent集合,线程安全类。对照表如下:

interface non-thread-safe thread-safe
List ArrayList CopyOnWriteArrayList
Map HashMap ConcurrentHashMap
Set HashSet / TreeSet CopyOnWriteArraySet
Queue ArrayDeque / LinkedList ArrayBlockingQueue / LinkedBlockingQueue
Deque ArrayDeque / LinkedList LinkedBlockingQueue

volatile关键字:不加锁也可以实现同步的一种机制,使得field可以被同其他线程同步。

例:

private volatile boolean done;
public boolean isDone(){ return done; }
public void setDone(){ done = true; }

6. 线程池

定义:能接受大量小任务并进行分发处理,使用ExecutorSerivce接口来表示。

ExecutorService有三个常用实现:

示例:

package MultiThr;
import java.util.concurrent.*;

public class ThrPool {
    // 创建一个固定大小的线程池
    ExecutorService es = Executors.newFixedThreadPool(4);

    for(int i = 0; i <= 5; i++){
        es.submit(new Task("" + i));
    }
    // 关闭线程池
    es.shutdown(); // 使用shutdown关闭线程池的时候,线程池会等待当前任务完成
    // shutdownNow(): 立即停止当前正在执行的任务
    // awaitTermination()则会等待指定的时间让线程池关闭
}

class Task implements Runnable{
    private final String name;

    public Task(String name){
        this.name = name;
    }

    @Override
    public void run(){
        System.out.println("start task" + name);
        try{
            Thread.sleep(1000);
        }catch(InterruptedException e){}
        System.out.println("end task" + name);
    }
}

6.1 动态调整的线程池

在简介中说过CachedThreadPool可以实现这一功能。扒一下CachedThreadPool的源码:

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

所以可以这样写:

int min = 4 ;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

6.2 定期反复执行的线程池

例如:每秒刷新证券价格的任务就可以通过这种线程池来执行。依然要通过Executors类来创建。

ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);

一次性任务:

ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);

以固定每3秒执行一次。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:

// 2秒后开始执行定时任务,每3秒执行一次
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);

每次任务执行间隔3秒。FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:

// 2秒后开始执行定时任务,每个任务之间间隔3秒
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);

7. Runnable接口

在前作说过创建新任务可以实现Runnable接口。但这个接口有一个问题:是一个void方法,并无返回值,所以执行一些有返回值的任务时候就有所不便。

解决这个问题的方法是使用Callable接口。与Runnable相比多一个返回值。示例:

class Task implements Callable<String>{
    public String call() throws Exception{
        return longTimeCalculation();
    }
}

从以上代码可以看出,Callable是一个泛型接口,<>当中标注要返回的类型,实现call方法可以返回指定的结果。

8. Future类型

一个Future类型的实例代表一个未来能获取结果的对象,可以获取异步执行的结果。ExecutorService.submit()方法可以返回一个Future()类型。

ExecutorService es = Executors.newFixedThreadPool(4);
// 定义任务
Callable<String> task = new Task();
// 提交任务并获得Future
Future<String> f = es.submit(task);
// 从Future返回异步执行的结果
String s = f.get();

Future的方法有:

9. Fork/Join

其思想为x分法:如果一个任务比较大,那就把它分为x部分执行。

示例代码(先粘贴,回去打):

public class Main {
    public static void main(String[] args) throws Exception {
        // 创建2000个随机数组成的数组:
        long[] array = new long[2000];
        long expectedSum = 0;
        for (int i = 0; i < array.length; i++) {
            array[i] = random();
            expectedSum += array[i];
        }
        System.out.println("Expected sum: " + expectedSum);
        // fork/join:
        ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
        long startTime = System.currentTimeMillis();
        Long result = ForkJoinPool.commonPool().invoke(task);
        long endTime = System.currentTimeMillis();
        System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
    }

    static Random random = new Random(0);

    static long random() {
        return random.nextInt(10000);
    }
}

class SumTask extends RecursiveTask<Long> {
    static final int THRESHOLD = 500;
    long[] array;
    int start;
    int end;

    SumTask(long[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            // 如果任务足够小,直接计算:
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += this.array[i];
                // 故意放慢计算速度:
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                }
            }
            return sum;
        }
        // 任务太大,一分为二:
        int middle = (end + start) / 2;
        System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
        SumTask subtask1 = new SumTask(this.array, start, middle);
        SumTask subtask2 = new SumTask(this.array, middle, end);
        invokeAll(subtask1, subtask2);
        Long subresult1 = subtask1.join();
        Long subresult2 = subtask2.join();
        Long result = subresult1 + subresult2;
        System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
        return result;
    }
}

10. ThreadLocal

在同一线程中传递同一对象。回去慢慢更。

上一篇 下一篇

猜你喜欢

热点阅读