中北软院创新实验室

Java编程的逻辑 -- 并发章 -- 线程的基本协作机制

2018-06-22  本文已影响67人  HikariCP

线程的基本协作

多线程间除了竞争访问同一资源外,也经常需要相互协作的去执行一些任务。而对于协作的基本机制用的最多的无疑是wait/notify。

协作的场景

wait/notify

wait和notify方法都是Object类提供给所有子类进行线程协作的一种实现机制。

wait:

public final void wait() throws InterruptedException
public final native void wait(long timeout) throws InterruptedException;

一个带时间参数,表示最多等待这么长时间。
一个不带,默认为0,表示无限期等待。

如果在wait的过程中线程被中断,则会抛出InterruptedException。我们在之前关于Thread类的的博文中也提到过这个。

wait在等什么?

我们之前说过的每个对象都有一把锁和一个等待队列,一个线程在进入synchronized代码块时,会尝试获取锁,如果获取不到则会把当前线程加入等待队列中,其实,除了对于锁的等待队列,每个对象还有另一个等待队列,即条件队列,该队列用于线程间的协作。调用
wait就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。当其他线程改变了条件后,应该调用Object(等待哪个对象就用哪个对象)的notify方法:

public final native void notify();
public final native void notifyAll();

notify做的事情就是从条件队列中选一个线程,将其从队列中移除并唤醒,notifyAll和notify的区別是,它会移除条件队列中所有的线程并全部唤醒。

public class WaitThread extends Thread {
    private volatile boolean fire = false;
    @Override
    public void run() {
        try {
            synchronized (this) {
                while(!fire) {
                    wait();
                }
            }
            System.out.println("fired");
        } catch(InterruptedException e) {
        }
    }
    public synchronized void fire() {
        this.fire = true;
        notify();
    }
    public static void main(String[] args) throws InterruptedException {
        WaitThread waitThread = new WaitThread();
        waitThread.start();
        Thread.sleep(1000);
        System.out.println("fire");
        waitThread.fire();
    }
}

代码中的协作的条件变量式fire,两个线程都要访问该fire变量,容易出现竞态条件所以相关代码都被synchronized保护了。

需要特别注意的是: wait和notify方法的调用只能再synchronized代码块中。如果在调用wait/notify方法时,当前线程没有对象锁的话,那么会抛出java.lang.IllegalMonitor-StateException

wait的具体过程:

  1. 把当前线程放入条件等待队列++释放对象锁++,阻塞等待,线程状态变为WAITING
    TIMED_WAITING
  2. 等待时间到或被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁
    • 如果能够获得锁,线程状态变为RUNNABLE,并从wait调用中返回。
    • 否则,该线程加入对象锁等待队列,线程状态变为BLOCKED,只有在获得锁后才会从wait调用返回。

这里我们一定要区别好两个等待队列,一个是线程没有分配到cpu时间片进入到的对象锁等待队列。另一个则是线程执行遇到条件不满足的情况进入条件等待队列

当线程从条件队列中返回不代表其等待的条件就满足了,也有可能是wait方法限定的时间到达了。我们在使用wait方法的时候当其跳出后还应该再判断一次。一般我们通过while循环的方式来做到。

synchronized (obj) {
    while(条件不成立)
        obj.wait();
        
    …//执行条件满足后的操作
}

调用notify会把在条件队列中等侍的线程唤醍并从队列中移除,但它不会释放对象锁,也就是说,只有在包含notify的synchronized代码块(被synchronized修饰过了)执行完后,等待的线程才会从wait调用中返回。这一点需要铭记

我们在使用wait时最难的是搞清楚wait到底等的是什么?,而notify通知的又是什么? 我们需要知道,它们被不同的线程调用,并共亨相同的锁和条件等待队列(相同对象的
synchronized代码块内),它们围绕一个共享的条件变量进行协作,这个条件变量是程序自己维护的,当条件不成立时,线程调用wait进入条件等待队列,另一个线程修改条件后调用notify,调用wait的线程被唤醒后需要重新检查条件变量。从多线程的角度看,它们围绕共享变量进行协作,从调用wait的线程角度看,它阻塞等待一个条件的成立。我们在设立多线程协作时,需要想清楚协作的++共享变量++++条件++是什么,这是协作的核心。**

线程的基本协作示例

我们前面说了线程基本协作的场景,这里书上给出了对于这几种场景的代码示例:

生产者/消费者模式

队列:

static class MyBlockingQueue<E> {
    private Queue<E> queue = null;
    private int limit;
    public MyBlockingQueue(int limit) {
        this.limit = limit;
        queue = new ArrayDeque<>(limit);
    }
    public synchronized void put(E e) throws InterruptedException {
        while(queue.size() == limit) {
            wait();
        }
        queue.add(e);
        notifyAll();
    }
    public synchronized E take() throws InterruptedException {
        while(queue.isEmpty()) {
            wait();
        }
        E e = queue.poll();
        notifyAll();
        return e;
    }
}

生产者:

static class Producer extends Thread {
    MyBlockingQueue<String> queue;
    public Producer(MyBlockingQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        int num = 0;
        try {
            while(true) {
                String task = String.valueOf(num);
                queue.put(task);
                System.out.println("produce task " + task);
                num++;
                Thread.sleep((int) (Math.random() * 100));
            }
        } catch (InterruptedException e) {
        }
    }
}

消费者:

static class Consumer extends Thread {
    MyBlockingQueue<String> queue;
    public Consumer(MyBlockingQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while(true) {
                String task = queue.take();
                System.out.println("handle task " + task);
                Thread.sleep((int)(Math.random()*100));
            }
        } catch(InterruptedException e) {
        }
    }
}

主程序:

public static void main(String[] args) {
    MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);
    new Producer(queue).start();
    new Consumer(queue).start();
}

在生产者消费者模式中,put等待的是队满而take等待的却是队空。但他们都会进入相同的对象的条件等待队列中。由于等待的条件不同,但两者的共享变量却都是该队列,所以此处不能使用notify,因为notify只能唤醒一个线程,而由于线程调度的机制。唤醒的如果是同类线程的话则起不到协调的作用,所以在共用同一个条件队列而等待的条件却相反时应该使用notifyAll。

Java提供了专门的阻塞队列,包括:

在实际场景中,应该优先使用这些实现类。

同时开始

同时开始的按例好比运动员比赛,一个主线程(裁判)决定着各个子线程(运动员)何时开始。

协作对象fired:

static class FireFlag {
    private volatile boolean fired = false;
    public synchronized void waitForFire() throws InterruptedException {
        while(!fired) {
            wait();
        }
    }
    public synchronized void fire() {
        this.fired = true;
        notifyAll();
    }
}

运动员:

static class Racer extends Thread {
    FireFlag fireFlag;
    public Racer(FireFlag fireFlag) {
        this.fireFlag = fireFlag;
    }
    @Override
    public void run() {
        try {
            this.fireFlag.waitForFire();
            System.out.println("start run "
                    + Thread.currentThread().getName());
        } catch (InterruptedException e) {
        }
    }
}

裁判:

public static void main(String[] args) throws InterruptedException {
    int num = 10;
    FireFlag fireFlag = new FireFlag();
    Thread[] racers = new Thread[num];
    for(int i = 0; i < num; i++) {
        racers[i] = new Racer(fireFlag);
        racers[i].start();
    }
    Thread.sleep(1000);
    fireFlag.fire();
}

等待结束

Thread类的join方法的实现其实就是借助于wait方法。其主要代码:

while (isAlive()) {
    wait(0);
}

该代码意义是:只要该等待线程活着就会一直等待,join的结束依赖与线程运行结束的时候Java系统调用notifyAll来通知该等待线程。

使用join有时比较麻烦需要等待各个子线程结束。这里书上给出的例子采用了另一种写法,主线程与子线程协作的是一个数,这个数表示未完成的线程的个数,初始值为子线程个数,主线程需要等待该值变为0,每个子线程结束后需要将该值减一,当为0时调用notifyAll。

协作对象MyLatch:

public class MyLatch {
    private int count;
    public MyLatch(int count) {
        this.count = count;
    }
    public synchronized void await() throws InterruptedException {
        while(count > 0) {
            wait();
        }
    }
    public synchronized void countDown() {
        count--;
        if(count <= 0) {
            notifyAll();
        }
    }
}

子线程:

static class Worker extends Thread {
    MyLatch latch;
    public Worker(MyLatch latch) {
        this.latch = latch;
    }
    @Override
    public void run() {
        try {
            //simulate working on task
            Thread.sleep((int) (Math.random() * 1000));
            this.latch.countDown();
        } catch (InterruptedException e) {
    }
}

主线程:

public static void main(String[] args) throws InterruptedException {
    int workerNum = 100;
    MyLatch latch = new MyLatch(workerNum);
    Worker[] workers = new Worker[workerNum];
    for(int i = 0; i < workerNum; i++) {
        workers[i] = new Worker(latch);
        workers[i].start();
    }
    latch.await();
    System.out.println("collect worker results");
}

MyLatch也可以应用在“同时开始”的场景,初始值设为1。

public class RacerWithLatchDemo {
    static class Racer extends Thread {
        MyLatch latch;
        public Racer(MyLatch latch) {
            this.latch = latch;
        }
        @Override
        public void run() {
            try {
                this.latch.await();
                System.out.println("start run "
                        + Thread.currentThread().getName());
            } catch (InterruptedException e) {
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        MyLatch latch = new MyLatch(1);
        Thread[] racers = new Thread[num];
        for(int i = 0; i < num; i++) {
            racers[i] = new Racer(latch);
            racers[i].start();
        }
        Thread.sleep(1000);
        latch.countDown();
    }
}

Java中提供了一个专门的同步类CountDownLatch,在实际开发中应该使用它。

异步结果

在主从模式中,手工创建线程比较麻烦。一种常见的模式是异步调用,异步调用一般返回一个名为Future的对象,通过它可以获得最终的结果。在Java中表示子任务的接口是Callable。如下是书上例子:

子任务:Callable

public interface Callable<V> {
    V call() throws Exception;
}

异步调用的结果:Future

public interface MyFuture <V> {
    V get() throws Exception ;
}

通过该接口的get方法返回真正的结果。如果结果还没有计算完成,get方法会阻寒直到计算完成,如果调用过程发生异常,则get方法抛出调用过程中的异常。

方便主线程调用子任务的类 MyExecutor

public <V> MyFuture<V> execute(final Callable<V> task)

通过该方法主线程就不需要在创建并管理子线程了。可以方便的获取到异步调用的结果。如下:

public static void main(String[] args) {
    MyExecutor executor = new MyExecutor();
    // 子任务
    Callable<Integer> subTask = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            //…执行异步任务
            int millis = (int) (Math.random() * 1000);
            Thread.sleep(millis);
            return millis;
        }
    };
    //异步调用子任务,返回一个MyFuture对象
    MyFuture<Integer> future = executor.execute(subTask);//内部创了个子线程去执行
    //…执行其他操作
    try {
        //获取异步调用的结果
        Integer result = future.get();
        System.out.println(result);
    } catch(Exception e) {
        e.printStackTrace();
    }
}

所以重点是,MyExecutor类的execute方法是怎么实现的呢 ?它封装了创建子线程,同步获取结果的过程,它会创建一个执行子线程。

MyFuture类的execute具体实现:

public <V> MyFuture<V> execute(final Callable<V> task) {
    final Object lock = new Object();
    final ExecuteThread<V> thread = new ExecuteThread<>(task, lock);
    thread.start();
    MyFuture<V> future = new MyFuture<V>() {
        @Override
        public V get() throws Exception {
            synchronized (lock) {
                while(!thread.isDone()) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
                if(thread.getException() != null) {
                    throw thread.getException();
                }
                return thread.getResult();
            }
        }
    };
    return future;
}

execute方法启动一个执行子线程,并返回携带执行结果的MyFuture对象。MyFuture的方法会阻塞等待知道子线程运行结束返回结果。

执行子线程 ExecuteThread

static class ExecuteThread<V> extends Thread {
    private V result = null;
    private Exception exception = null;
    private boolean done = false;
    private Callable<V> task;
    private Object lock;
    public ExecuteThread(Callable<V> task, Object lock) {
        this.task = task;
        this.lock = lock;
    }
    @Override
    public void run() {
        try {
            result = task.call();
        } catch (Exception e) {
            exception = e;
        } finally {
            synchronized (lock) {
                done = true;
                lock.notifyAll();
            }
        }
    }
    public V getResult() {
        return result;
    }
    public boolean isDone() {
        return done;
    }
    public Exception getException() {
        return exception;
    }
}

Java中也已经包含了一套完善的方案,有:

集合点

和之前等待结束和同时开始案例相似。各线程分开行动,各自到大一个集合点,在集合点需要集齐所有线程,交换数据然后再进行下一步动作。协作的共享变量n,初始值为线程总数,当有一个线程到达集合点n减一。直到变为0即最后一个也到达,通过notifyAll来唤醒所有条件等待线程。

协作对象:AssemblePoint


协作对象public class AssemblePoint {
    private int n;
    public AssemblePoint(int n) {
        this.n = n;
    }
    public synchronized void await() throws InterruptedException {
        if(n > 0) {
            n--;
            if(n == 0) {
                notifyAll();
            } else {
                while(n != 0) {
                    wait();
                }
            }
        }
    }
}

主线程:AssemblePointDemo

public class AssemblePointDemo {
    static class Tourist extends Thread {
        AssemblePoint ap;
        public Tourist(AssemblePoint ap) {
            this.ap = ap;
        }
        @Override
        public void run() {
            try {
                //模拟先各自独立运行
                Thread.sleep((int) (Math.random() * 1000));
                //㼿㼿
                ap.await();
                System.out.println("arrived");
                //…㼿㼿㼿㼿㼿㼿㼿㼿㼿
            } catch (InterruptedException e) {
            }
        }
    }
    public static void main(String[] args) {
        final int num = 10;
        Tourist[] threads = new Tourist[num];
        AssemblePoint ap = new AssemblePoint(num);
        for(int i = 0; i < num; i++) {
            threads[i] = new Tourist(ap);
            threads[i].start();
        }
    }
}

Java中有一个专门的同步工具类CyclicBarrier可以替代该AssemblePoint类。

总结

该节主要介绍了Java中线程间协作的基本机制wait/notify,协作关键要想淸楚协作的共享变贵和条件是什么。Java中有专门为协作而建的阻塞队列、同步工具类,以及Executors框架。

上一篇下一篇

猜你喜欢

热点阅读