第21章 多线程
1. 一个任务可以多次获得对象上的锁。
如果一个方法在同一个对象上调用了第二个方法,后者又调用了同一个对象上的另一个方法,就会发生这种情况。JVM负责跟踪对象被加锁的次数,如果一个对象被解锁(即锁完全释放),其计数变为0。在任务给对象第一次加锁的时候,计数变为1.每当这个相同的任务在这个对象上获得锁时,计数都会递增。显然,只有首先获得了锁的任务才能允许继续获得更多锁。每当任务离开一个synchronized方法,计数递减。当计数为0的时候,锁被完全释放,此时别的任务就可以使用此资源。
2. 你应该什么时候使用同步呢?可以运用Brian的同步规则。
如果你正在写一个变量,它可能接下来被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,那么你必须使用同步,并且,读写线程都必须用相同的监视器锁同步。
3. 显示Lock的优点:
3.1 如果在使用synchronized关键字时,某些事物失败了,那么会抛出一个异常。但是你没有机会去做清理工作,以维护系统使其处于良好的状态。有了显示的Lock对象,你就可以使用finally子句将系统维护在正确的状态了。
4. Java线程中的Thread.yield( )方法
译为线程让步。顾名思义,就是说当一个线程使用了这个方法之后,它就会把自己CPU执行的时间让掉,
让自己或者其它的线程运行,注意是让自己或者其他线程运行,并不是单纯的让给其他线程。
yield()的作用是让步。它能让当前线程由“运行状态”进入到“就绪状态”,从而让其它具有相同优先级的等待线程获取执行权;但是,并不能保
证在当前线程调用yield()之后,其它具有相同优先级的线程就一定能获得执行权;也有可能是当前线程又进入到“运行状态”继续运行!
21.3.3 原子性与易变性
1. volatile的作用
如果你将一个域定义为volatile,那么它就会告诉编译器不要执行任何移除读取和写入操作的优化,这些操作的目的是用线程中的局部变量维护对这个域的精确同步。
21.3.4 原子类
Java SE5引入了注入AtomicInteger、AtomicLong、AtomicReference等特殊的原子性变量类,这些类被调整为可以使用在某些现代处理器上的可获得的,并且是在机器级别上的原子性,因此在使用他们时,通常不需要担心。
应该强调的是,Atomic类被设计用来构建java.util.concurrent中的类,因此只有在特殊情况下才在自己的代码中使用它们,即便使用了也需要确保不存在其他可能出现的问题。通常依赖于锁要更安全一些(要么是synchronized关键字,要么是显示的Lock对象)。
21.3.5 临界区
有时,你只是希望放置多个线程同时访问方法内部的部分代码而不是防止访问整个方法。通过这种方式分离出来的代码段被称为临界区。
通过使用同步代码块,而不是对整个方法进行同步控制,可以使多个任务访问对象的时间性能得到显著提高。
模板方法
在派生类中对抽象类进行实现,这样的结构称为模板方法。
/**
* 输出结果 pm1: Pair: x: 12, y: 12 checkCounter = 79275
* pm2: Pair: x: 13, y: 13 checkCounter = 31612241
*
/
package twentyone.three.five;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class CriticalSection {
// Test the two different approaches:
static void testApproaches(PairManager pman1, PairManager pman2) {
ExecutorService exec = Executors.newCachedThreadPool();
PairManipulator
pm1 = new PairManipulator(pman1),
pm2 = new PairManipulator(pman2);
PairChecker
pcheck1 = new PairChecker(pman1),
pcheck2 = new PairChecker(pman2);
exec.execute(pm1);
exec.execute(pm2);
exec.execute(pcheck1);
exec.execute(pcheck2);
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
System.out.println("Sleep interrupted");
}
System.out.println("pm1: " + pm1 + "\npm2: " + pm2);
System.exit(0);
}
public static void main(String[] args) {
PairManager
pman1 = new PairManager1(),
pman2 = new PairManager2();
testApproaches(pman1, pman2);
}
}
class Pair { // Not thread safe
private int x, y;
public Pair(int x, int y) {
this.x = x;
this.y = y;
}
public Pair () {
this(0, 0);
}
public int getX() { return x; }
public int getY() { return y; }
public void incrementX() { x++; }
public void incrementY() { y++; }
public String toString () {
return "x: " + x + ", y: " + y;
}
public class PairValuesNotEqualException extends RuntimeException {
public PairValuesNotEqualException() {
super("Pair values not equal: " + Pair.this);
}
}
// Arbitrary invariant -- both variables must be equal:
public void checkState() {
if (x != y) {
throw new PairValuesNotEqualException();
}
}
}
/**
* PairManager
* 持有一个Pair
* 一个线程安全的集合,存储Pair
* 一个线程安全的方法,拿到一个持有的Pair
* 一个存储接收到的Pair的方法
* 一个抽象方法 自增成员变量
*/
// Protect a Pair inside a thread-safe class:
abstract class PairManager {
AtomicInteger checkCount = new AtomicInteger(0);
protected Pair p = new Pair();
private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>());
public synchronized Pair getPair () {
// Make a copy to keep the original safe:
return new Pair(p.getX(), p.getY());
}
// Assume this is a time consuming operation
protected void store(Pair p) {
storage.add(p);
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException ignore) {}
}
public abstract void increment();
}
// Synchronized the entire method:
class PairManager1 extends PairManager {
public synchronized void increment () {
p.incrementX();
p.incrementY();
store(getPair());
}
}
// User a critical section
class PairManager2 extends PairManager {
public void increment () {
Pair temp;
synchronized (this) {
p.incrementX();
p.incrementY();
temp = getPair();
}
store(temp);
}
}
/**
* Pair操作器 线程类
* 构造参数接收一个PairManager
* 任务中对Pair成员变量无限自增长
*/
class PairManipulator implements Runnable {
private PairManager pm;
public PairManipulator(PairManager pm) {
this.pm = pm;
}
public void run () {
while(true) {
pm.increment();
}
}
public String toString () {
return "Pair: " + pm.getPair() + " checkCounter = " + pm.checkCount.get();
}
}
/**
* Pair检查器 线程类
* 构造参数接收一个PairManager
* 任务中无限自增计数器,并检查成员变量的状态
*/
class PairChecker implements Runnable {
private PairManager pm;
public PairChecker(PairManager pm) {
this.pm = pm;
}
public void run () {
while(true) {
pm.checkCount.incrementAndGet();
pm.getPair().checkState();
}
}
}
21.3.6 在其他对象上同步
在另一个对象上进行同步,要确保所有相关的任务都是在同一个对象上进行同步的。
package twentyone.three.six;
/**
* output:
* g()
* g()
* g()
* f()
* g()
* f()
* g()
* f()
* f()
* f()
*/
public class SyncObject {
public static void main(String[] args) {
final DualSynch ds = new DualSynch();
new Thread() {
public void run () {
ds.f();
}
}.start();
ds.g();
}
}
class DualSynch {
private Object syncObject = new Object();
public synchronized void f() {
for (int i = 0; i < 5; i++) {
System.out.println("f()");
Thread.yield();
}
}
public void g() {
synchronized (syncObject) {
for (int i = 0; i < 5; i++) {
System.out.println("g()");
Thread.yield();
}
}
}
}
21.3.7 线程本地存储
防止任务在共享资源上产生冲突的第二种方式是根除对变量的共享。线程本地存储是一种自动化机制,可以为使用相同变量的每个不同的线程都创建不同的存储。
package twentyone.three.seven;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* output: 每个线程的增长都没有受到其他线程的影响。
* #0 : 9259
* #1 : 556
* #2 : 6694
* #0 : 9260
* #1 : 557
* #3 : 1862
* #4 : 962
* #0 : 9261
* #2 : 6695
* #1 : 558
* #3 : 1863
* #4 : 963
* #0 : 9262
* #2 : 6696
* #2 : 6697
* #1 : 559
* #3 : 1864
* #4 : 964
*/
public class ThreadLocalVariableHolder {
private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
private Random rand = new Random(47);
protected synchronized Integer initialValue () {
return rand.nextInt(10000);
}
};
public static void increment () {
value.set(value.get() + 1); // 使用本地线程对变量进行操作
}
public static int get() { return value.get(); }
public static void main(String[] args) throws Exception{
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Accessor(i));
}
TimeUnit.SECONDS.sleep(3); // Run for a while
exec.shutdown(); // All Accessors will quit
}
}
class Accessor implements Runnable {
private final int id;
public Accessor(int idn) {
id = idn;
}
public void run () {
while (!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}
public String toString () {
return "#" + id + " : " + ThreadLocalVariableHolder.get();
}
}
21.4 终结任务
21.4.1 装饰性花园
在这个仿真程序中,花园委员会希望了解每天通过多个大门进入公园的总人数。每个大门都有一个十字转门或某种其他形式的计数器,并且任何一个十字转门的计数值递增时,就表示公园中的总人数的共享记数值也会递增。
package twentyone.four;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* part of output:
* Total: 150
* Sum of Entrances: 150
*/
public class OrnamentalGarden {
public static void main(String[] args) throws Exception{
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Entrance(i));
}
// Run for a while, then stop and collect the data:
TimeUnit.SECONDS.sleep(3);
Entrance.cancel();
exec.shutdown();
if (!exec.awaitTermination(250, TimeUnit.MILLISECONDS)) {
System.out.println("Some tasks were not terminated");
}
System.out.println("Total: " + Entrance.getTotalCount());
System.out.println("Sum of Entrances: " + Entrance.sumEntrances());
}
}
/**
* 一个计数器
*/
class Count {
private int count = 0;
private Random rand = new Random(47);
// Remove the synchronized keyword to see the counting fail:
// temp和yield()方法增加了多个线程抢占共享资源的可能性
public synchronized int increment () {
int temp = count;
// 随机产生布尔值 为真时 所有任务重新抢占CPU
if (rand.nextBoolean()) // Yield the half time
Thread.yield();
return (count = ++ temp);
}
// 同步方法 获取当前计数
public synchronized int value() { return count; }
}
/**
*
*/
class Entrance implements Runnable {
private static Count count = new Count(); // 所有的Entrance对象共享一个计数器
//一个静态的集合 每次创建Entrance对象都会向集合中添加元素
private static List<Entrance> entrances = new ArrayList<Entrance>();
private int number = 0;
// Doesn't need synchronized to read:
private final int id;
private static volatile boolean canceled = false;
// Atomic operation on a volatile field:
public static void cancel() { canceled = true; }
// 构造器 初始化时传入一个身份id 在集合中增加一个自己的实例
public Entrance(int id) {
this.id = id;
// Keep this task in a list. Also prevents
// garden collection of dead tasks:
entrances.add(this);
}
public void run() {
while (!canceled) {
synchronized (this) {
++number;
}
System.out.println(this + " Total: " + count.increment());
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
System.out.println("sleep interrupted");
}
}
System.out.println("Stopping " + this);
}
public synchronized int getValue() {
return number;
}
public String toString() {
return "Entrance " + id + ": " +getValue();
}
public static int getTotalCount() {
return count.value();
}
public static int sumEntrances() {
int sum = 0;
for (Entrance entrance : entrances) {
sum += entrance.getValue();
}
return sum;
}
}
21.4.2阻塞时终结
21.4.2.1 线程状态:
1)新建(new):当线程被创建时,它只会短暂的处于这种状态。此时它已经分配了必须得系统资源,并执行了初始化。此刻线程已经有资格获得CPU时间了,之后调度器将把这个线程转变为可运行状态或阻塞状态。
2)就绪(Runnable):在这种状态下,只要调度器把时间片分配给线程,线程就可以运行。也就是说,在任意时刻,线程可以运行也可以不运行。只要调度器能分配时间片给线程,它就可以运行;这不同于希望和阻塞状态。
3)阻塞(Blocked):线程能够运行,但有某个条件组织它运行。当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何CPU时间。直到线程重新进入就绪状态,它才有可能执行操作。
4)死亡(Dead):处于死亡或者终止状态的线程将不再是可调度的,并且再也不会得到CPU时间,它的任务已结束,或不再是可运行的。任务死亡的通常方式是从run()方法返回,但是任务的线程还可以被终端。
21.4.2.2 进入阻塞状态
一个任务阻塞状态,可能有如下原因:
1)通过调用sleep(milliseconds)使任务进入休眠状态,在这种情况下,任务在指定时间内不会运行。
2)你通过使用wait()使线程挂起。直到线程得到了notify()或notifyAll()消息(或者在Java SE5的java.util.concurrent类库中等价的signal()或signalAll()消息),线程才会进入就绪状态。
3)任务在等待某个输入/输出完成。
4)任务试图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另一个任务已经获取了这个锁。
21.4.3 中断
package twentyone.four.three;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* output:
* Interruping twentyone.four.three.SleepBlocked
* InterruptedException
* Exiting SleepBlocked.run()
* Interrupt send to twentyone.four.three.SleepBlocked
* Waiting for read()
* Interruping twentyone.four.three.IOBlocked
* Interrupt send to twentyone.four.three.IOBlocked
* Trying to call f()
* Interruping twentyone.four.three.SynchronizedBlocked
* Interrupt send to twentyone.four.three.SynchronizedBlocked
* Aborting with System.exit(0)
*/
public class Interrupting {
private static ExecutorService exec = Executors.newCachedThreadPool();;
static void test(Runnable r) throws InterruptedException {
Future<?> f = exec.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Interruping " + r.getClass().getName());
// 用参数true调用这个方法会interrupt一个正在执行的线程,如果这个线程还没启动
// 那么这个任务将永远不会执行。在这个方法返回以后,调用isDone() 和 isCanceled()
// 方法都会返回true。
f.cancel(true);
System.out.println("Interrupt send to " + r.getClass().getName());
}
public static void main(String[] args) throws Exception{
// 正在sleep的线程是可以中断的
test(new SleepBlocked());
// 试图执行IO操作的线程是不可用这个方式中断的,输出中可以看出,因为没有抛出InterruptedException
// 这个任务的打印语句没有执行,是因为执行了System.exitt(0)
test(new IOBlocked(System.in));
// 正在试图获取锁的线程也是不能通过这种方式Interrupt的。
test(new SynchronizedBlocked());
TimeUnit.SECONDS.sleep(3);
System.out.println("Aborting with System.exit(0)");
System.exit(0); // ... since last 2 interrupts failed
}
}
class SleepBlocked implements Runnable {
public void run() {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
System.out.println("InterruptedException");
}
System.out.println("Exiting SleepBlocked.run()");
}
}
class IOBlocked implements Runnable {
private InputStream in;
public IOBlocked(InputStream is) {
in = is;
}
public void run() {
try {
System.out.println("Waiting for read()");
in.read();
} catch (IOException e) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Interrupted from blocked I/O");
} else {
throw new RuntimeException(e);
}
}
System.out.println("Exiting IOBlocked.run()");
}
}
class SynchronizedBlocked implements Runnable {
public synchronized void f() {
while (true) {// Never release lock
Thread.yield();
}
}
public SynchronizedBlocked() {
new Thread(()->{
f(); // Lock acquired by this thread
}).start();
}
public void run() {
System.out.println("Trying to call f()");
f();
System.out.println("Exiting SynchronizedBlocked.run()");
}
}
如何Interrupt一个正在试图执行I/O操作的任务。
有一个略显笨拙但有时确实行之有效的办法,即关闭任务在其上发生阻塞的底层资源。
package twentyone.four.three;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* output:
* Waiting for read()
* Waiting for read()
* Shuting down all threads
* Closing java.net.SocketInputStream
* Interrupted from blocked I/O
* Exiting IOBlocked.run()
* Closing java.io.BufferedInputStream
*/
public class CloseResource {
public static void main(String[] args) throws Exception{
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InputStream socketInput= new Socket("localhost", 8080).getInputStream();
exec.execute(new IOBlocked(socketInput));
exec.execute(new IOBlocked(System.in));
TimeUnit.SECONDS.sleep(10);
System.out.println("Shuting down all threads");
exec.shutdownNow();
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + socketInput.getClass().getName());
socketInput.close(); // Release blocked thread
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + System.in.getClass().getName());
System.in.close();
}
}
被互斥所阻塞
一个任务能够调用在同一个对象中的其他同步方法:
package twentyone.four.three;
/**
* output:
* f1() calling f2() with count 9
* f2() calling f1() with count 8
* f1() calling f2() with count 7
* f2() calling f1() with count 6
* f1() calling f2() with count 5
* f2() calling f1() with count 4
* f1() calling f2() with count 3
* f2() calling f1() with count 2
* f1() calling f2() with count 1
* f2() calling f1() with count 0
*
* Process finished with exit code 0
*/
public class MultiLock {
public synchronized void f1(int count) {
if (count -- > 0) {
System.out.println("f1() calling f2() with count " + count);
f2(count);
}
}
public synchronized void f2(int count) {
if (count-- > 0) {
System.out.println("f2() calling f1() with count " + count);
f1(count);
}
}
public static void main(String[] args) throws Exception {
final MultiLock multiLock = new MultiLock();
new Thread() {
public void run() {
multiLock.f1(10);
}
}.start();
}
}
I/O和synchronized方法会产生不可中断的阻塞,这样就可能会锁住程序,所以在Java SE5中添加了一个特性,即在ReentrantLock上阻塞的任务具备可以被中断的能力。
package twentyone.four.three;
import com.sun.crypto.provider.BlowfishKeyGenerator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* output:
* Waiting for f() in BlockMutex
* Issuing t.interrupt()
* Interrupt from lock acquisition in f()
* Broken out of blocked call
*/
public class Interrupting2 {
public static void main(String[] args) throws Exception{
Thread t = new Thread(new Blocked2());
t.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("Issuing t.interrupt()");
t.interrupt();
}
}
class BlockMutex {
private Lock lock = new ReentrantLock();
public BlockMutex() {
// Acquire it right away, to demonstrate a second task
// of a task blocked on a reentrantLock:
lock.lock();
}
public void f() {
try {
// This will never be available to s second task
lock.lockInterruptibly(); // Special call
System.out.println("lock aquire in f()");
} catch (InterruptedException e) {
System.out.println("Interrupt from lock acquisition in f()");
}
}
}
class Blocked2 implements Runnable {
BlockMutex blocked = new BlockMutex();
public void run () {
System.out.println("Waiting for f() in BlockMutex");
blocked.f();
System.out.println("Broken out of blocked call");
}
}
21.4.4 检查中断
package twentyone.four.three;
import java.util.concurrent.TimeUnit;
/**
* 这个方法要接受一个命令行参数,主线程将会根据参数休眠
* 这个方法主要表达,如果通过Thread.interrupt() 方法退出任务
* 那么一定要确保使用try-finally子句关闭资源
*/
public class InterruptingIdiom {
public static void main(String[] args) throws Exception{
if (args.length != 1) {
System.out.println("usage: java InterruptingIdiom delay-in-mS");
System.exit(1);
}
Thread t = new Thread(new Blocked3());
t.start();
TimeUnit.MILLISECONDS.sleep(5);
t.interrupt();
}
}
class NeedsCleanup {
private final int id;
public NeedsCleanup(int ident) {
id = ident;
System.out.println("Cleaning up " + id);
}
public void cleanup() {
System.out.println("Clean up " + id);
}
}
class Blocked3 implements Runnable {
private volatile double d = 0.0;
public void run () {
try {
while (!Thread.interrupted()) {
// point1
NeedsCleanup n1 = new NeedsCleanup(1);
// Start try-finally immediately after definition
// of it, to guarentee proper cleanup of n1:
try {
System.out.println("Sleeping");
TimeUnit.SECONDS.sleep(1);
// point2
NeedsCleanup n2 = new NeedsCleanup(2);
// Guarantee proper cleanup of n2:
try {
System.out.println("Calculating");
// A time-consuming, non-blocking operation:
for (int i = 0; i < 250000; i++) {
d = d + (Math.PI + Math.E) / d;
}
System.out.println("Finished time-consuming operation");
}finally {
n2.cleanup();
}
} finally {
n1.cleanup();
}
}
System.out.println("Exiting via while() test");
}catch (InterruptedException e) {
System.out.println("Exiting via InterruptedException");
}
}
}
21.5线程之间的协作
下面的任务不是解决任务之间的资源的抢夺问题,而是解决任务之间的协作的问题。
21.5.1wait()与notifyAll()
sleep() 和 yield()不会释放锁,wait()释放锁。
只能在同步控制方法或者同步控制块里调用wait()、notify()和notifyAll().如果在非同步控制方法里调用wait的相关方法,会得到一个异常:IllegalMonitorStateException。
如果要向对象x发送notifyAll(),那么就必须在能够取得x的锁的同步控制块中这么做:
synchronized(x) {
x.notifyAll();
}
下面是本节示例代码:
package twentyone.five;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* output:
* Wax On
* Wax Off!
* Wax On
* Wax Off!
* Wax On
* Wax Off!
* Wax On
* Wax Off!
* ...
*/
public class WaxOMatic {
public static void main(String[] args) throws Exception{
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5); // Run for a while...
// 当调用这个方法时,它会调用所有由它控制的线程的interrupt()方法。
exec.shutdownNow(); // Interrupt all tasks
}
}
class Car {
private boolean waxOn = false;
public synchronized void waxed(){
waxOn = true; // Ready to buff
notifyAll();
}
public synchronized void buffed () {
waxOn = false; // Ready for another coat of wax
notifyAll();
}
public synchronized void waitForWaxing() throws InterruptedException {
while (waxOn == false) {
wait();
}
}
public synchronized void waitForBuffing() throws InterruptedException {
while (waxOn == true) {
wait();
}
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) {car = c;}
public void run() {
try{
while (!Thread.interrupted()){
System.out.println("Wax On");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax On task");
}
}
class WaxOff implements Runnable{
private Car car;
public WaxOff(Car c) { car = c; }
public void run() {
try {
while (!Thread.interrupted()){
car.waitForWaxing();
System.out.println("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
}catch (InterruptedException e) {
System.out.println("Exiting via interrupt");
}
System.out.println("Ending Wax Off task");
}
}
21.5.2 notify()与notifyAll()
package twentyone.five;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* output:
* notify() Thread[pool-1-thread-1,5,main]
* notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
* notify() Thread[pool-1-thread-1,5,main]
* notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
* notify() Thread[pool-1-thread-1,5,main]
* notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
* notify() Thread[pool-1-thread-1,5,main]
* notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
* notify() Thread[pool-1-thread-1,5,main]
* notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
* notify() Thread[pool-1-thread-1,5,main]
* notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
* Timer canceled
* Task2.blocker.prodAll() Thread[pool-1-thread-6,5,main]
* Shutting down
*/
public class NotifyVsNotifyAll {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Task());
}
exec.execute(new Task2());
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
boolean prod = true;
@Override
public void run() {
if (prod) {
System.out.print("\nnotify() ");
// 因为使用的是Task的Blocker,所以不会唤醒Task2
Task.blocker.prod();
prod = false;
} else {
System.out.print("\nnotifyAll() ");
Task.blocker.prodAll();
prod = true;
}
}
}, 400, 400); // Run every .4 second
TimeUnit.SECONDS.sleep(5); // Run for a while
timer.cancel();
System.out.println("\nTimer canceled");
TimeUnit.MILLISECONDS.sleep(500);
System.out.print("Task2.blocker.prodAll() ");
Task2.blocker.prodAll();
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("\nShutting down");
exec.shutdownNow(); // Interrupt all tasks
}
}
class Blocker {
synchronized void waitingCall() {
try {
while (!Thread.interrupted()) {
wait();
System.out.print(Thread.currentThread() + " ");
}
} catch (InterruptedException e) {
// Ok to exit this way.
}
}
synchronized void prod() { notify(); }
synchronized void prodAll() { notifyAll(); }
}
class Task implements Runnable {
static Blocker blocker = new Blocker();
public void run() { blocker.waitingCall(); }
}
class Task2 implements Runnable {
// A separate Blocker object:
static Blocker blocker = new Blocker();
public void run() { blocker.waitingCall(); }
}
21.5.3 生产者与消费者
package twentyone.five;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* output:
* Order up!
* WaitPerson got Meal 1
* Order up!
* WaitPerson got Meal 2
* Order up!
* WaitPerson got Meal 3
* Order up!
* WaitPerson got Meal 4
* Order up!
* WaitPerson got Meal 5
* Order up!
* WaitPerson got Meal 6
* Order up!
* WaitPerson got Meal 7
* Order up!
* WaitPerson got Meal 8
* Order up!
* WaitPerson got Meal 9
* Out of food, closing
* WaitPerson interrupted
* Order up!
* Chef interrupted
*/
public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurant();
}
}
class Meal{
private final int orderNum;
public Meal(int orderNum) {
this.orderNum = orderNum;
}
public String toString() { return "Meal " + orderNum; }
}
class WaitPerson implements Runnable {
private Restaurant restaurant;
public WaitPerson(Restaurant r) { restaurant = r; }
public void run() {
try{
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal == null) {
wait(); // ... for the chef to produce a meal
}
}
System.out.println("WaitPerson got " + restaurant.meal);
synchronized (restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll(); // Ready for another
}
}
}catch (InterruptedException e) {
System.out.println("WaitPerson interrupted");
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant r) { restaurant = r; }
public void run() {
try{
while (!Thread.interrupted()){
synchronized (this){
while (restaurant.meal != null) {
wait(); // .. for the meal to be taken
}
}
if (++count == 10) {
System.out.println("Out of food, closing");
restaurant.exec.shutdownNow();
}
System.out.println("Order up!");
synchronized (restaurant.waitPerson){
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
}catch (InterruptedException e) {
System.out.println("Chef interrupted");
}
}
}
使用显示的Lock和Condition对象
使用互斥并允许任务挂起的基本类是Condition,可以通过在Condition上调用await()来挂起一个任务,可以通过调用signal()来唤醒一个任务,或者使用signalAll()唤醒所有任务。同样在,在使用这些方法之前,必须拥有锁。
package twentyone.five;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* output:
* Order up!
* WaitPerson got Meal 1
* Order up!
* WaitPerson got Meal 2
* Order up!
* WaitPerson got Meal 3
* Order up!
* WaitPerson got Meal 4
* Order up!
* WaitPerson got Meal 5
* Order up!
* WaitPerson got Meal 6
* Order up!
* WaitPerson got Meal 7
* Order up!
* WaitPerson got Meal 8
* Order up!
* WaitPerson got Meal 9
* Out of food, closing
* WaitPerson interrupted
* Order up!
* Chef interrupted
*/
public class Restaurant {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurant();
}
}
class Meal{
private final int orderNum;
public Meal(int orderNum) {
this.orderNum = orderNum;
}
public String toString() { return "Meal " + orderNum; }
}
class WaitPerson implements Runnable {
private Restaurant restaurant;
public WaitPerson(Restaurant r) { restaurant = r; }
public void run() {
try{
while (!Thread.interrupted()) {
try{
restaurant.lock.lock();
while (restaurant.meal == null) {
restaurant.condition.await(); // ... for the chef to produce a meal
}
}finally {
restaurant.lock.unlock();
}
System.out.println("WaitPerson got " + restaurant.meal);
try{
restaurant.lock.lock();
restaurant.meal = null;
restaurant.condition.signalAll();
}finally {
restaurant.lock.unlock();
}
}
}catch (InterruptedException e) {
System.out.println("WaitPerson interrupted");
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant r) { restaurant = r; }
public void run() {
try{
while (!Thread.interrupted()){
try{
restaurant.lock.lock();
while (restaurant.meal != null) {
restaurant.condition.await(); // .. for the meal to be taken
}
}finally {
restaurant.lock.unlock();
}
if (++count == 10) {
System.out.println("Out of food, closing");
restaurant.exec.shutdownNow();
}
System.out.println("Order up!");
try{
restaurant.lock.lock();
restaurant.meal = new Meal(count);
restaurant.condition.signalAll();
}finally {
restaurant.lock.unlock();
}
TimeUnit.MILLISECONDS.sleep(100);
}
}catch (InterruptedException e) {
System.out.println("Chef interrupted");
}
}
}
21.5.4 生产者-消费者与队列
wait()和notifyAll()方法是一种解决任务协作的非常低级的方法,我们可以使用同步队列来解决任务协作问题。在java.util.concurrent.BlockingQueue接口中提供了这个队列。通常可以使用LinkedBlockingQueue,这是无界队列,还可以使用ArrayBlockingQueue,它是有界队列。
package twentyone.five;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.*;
public class TestBlockingQueues {
// 从控制台读取一行输入
static void getkey() {
try {
// Compensate for Windows/Linux difference in the
// length of the result produced by the Enter key:
new BufferedReader(
new InputStreamReader(System.in)
).readLine();
}catch (IOException e) {
throw new RuntimeException(e);
}
}
// 接受一个字符串,打印到控制台,接下来读取一个控制台输入
static void getkey(String message){
System.out.println(message);
getkey();
}
static void test(String msg, BlockingQueue<LiftOff> queue) {
System.out.println(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
for (int i = 0; i < 5; i++) {
runner.add(new LiftOff(5));
}
getkey("Press 'Enter' (" + msg + ")");
t.interrupt();
System.out.println("Finished " + msg + " test");
}
public static void main(String[] args) throws Exception{
// test("LinkedBlockingQueue", // Unlimited size
// new LinkedBlockingDeque<LiftOff>() );
// test("ArrayBlockingQueue", // Fixed size
// new ArrayBlockingQueue<LiftOff>(3));
// test("SynchronousQueue", // Size of 1
// new SynchronousQueue<LiftOff>());
LiftOffRunner liftOffRunner = new LiftOffRunner(new LinkedBlockingQueue<LiftOff>());
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(liftOffRunner);
exec.execute(new LiftOffPuter(liftOffRunner));
TimeUnit.SECONDS.sleep(1);
exec.shutdownNow();
System.exit(1);
}
}
class LiftOffRunner implements Runnable{
private BlockingQueue<LiftOff> rockets;
// 构造函数 初始化一个BlockingQueue
public LiftOffRunner(BlockingQueue<LiftOff> queue) { rockets = queue; }
public void add(LiftOff lo) {
try {
// 将指定元素加入队列,必要的情况下进行等待,直到可用
rockets.put(lo);
} catch (InterruptedException e) {
System.out.println("Interrupted during put()");
}
}
public void run() {
try {
while (!Thread.interrupted()) {
// 取出并移除队列的头,在元素可用之前一直等待
LiftOff rocket = rockets.take();
rocket.run();
}
}catch (InterruptedException e) {
System.out.println("Waking from take()");
}
System.out.println("Exiting LiftOffRunner");
}
}
class LiftOffPuter implements Runnable {
LiftOffRunner runner;
public LiftOffPuter(LiftOffRunner runner) { this.runner = runner; }
@Override
public void run() {
for (int i = 0; i < 5; i++) {
while (!Thread.interrupted()){
runner.add(new LiftOff(5));
}
System.out.println("Exiting LiftOffPuter via interrupt");
}
}
}
21.5.5 任务间使用管道进行输入/输出
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* output:
* Read: A,
* Read: B,
* Read: C,
* Read: D,
* Read: E,
* Read: F,
* Read: G,
* Read: H,
* Read: I,
* Read: J,
* Read: K,
* Read: L,
* Read: M,
* java.lang.InterruptedException: sleep interruptedSender sleep interrupted
* java.io.InterruptedIOException Receiver read exception
*/
public class PipedIO {
public static void main(String[] args) throws Exception{
Sender sender = new Sender();
Receiver reveiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(reveiver);
TimeUnit.SECONDS.sleep(4);
exec.shutdownNow();
}
}
class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() { return out; }
public void run() {
try {
while (true) {
for (char c = 'A'; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
}
} catch (IOException e) {
System.out.println(e + " Sender write exception");
} catch (InterruptedException e) {
System.out.println(e + "Sender sleep interrupted");
}
}
}
class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
public void run() {
try {
while (true) {
// Blocks until characters are there:
System.out.println("Read: " + (char) in.read() + ", ");
}
} catch (IOException e) {
System.out.println(e + " Receiver read exception");
}
}
}
21.6 死锁
当以下4个条件同时满足时,就会发生死锁:
1.互斥条件。任务使用的资源中,至少有一个是不能共享的。
2.至少有一个任务它必须持有一个资源且正在等待获取一个当前被别的任务持有的资源。
3.资源不能被任务抢占,任务必须把资源释放当做普通事件。
4.必须有循环等待,这时,一个任务等待其他任务所持有的资源,后者又在等待另一个任务所持有的资源,这样一直下去,直到有一个任务在等待第一个任务所持有的资源。
因为要发生死锁的话,所有这些条件必须全部满足;所以要防止死锁的话,只需破坏其中一个即可。
21.7新类库中的构件
21.7.1 CountDownLatch
Random类的nextInt方法是线程安全的。
package twentyone.seven;
import javafx.concurrent.Task;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 在CountDownLatch计数结束之前WaitingTask会一直被阻塞,直到程序结束仍然没有计数到0,
* 所以WaitingTask在线程池关闭时被interrupt
* OUTPUT:
* Launched all tasks
* 99 completed
* 41 completed
* 36 completed
* 95 completed
* 94 completed
* 10 completed
* 21 completed
* 77 completed
* 7 completed
* WaitingTask 9 interrupted
* WaitingTask 7 interrupted
* WaitingTask 8 interrupted
* WaitingTask 6 interrupted
* WaitingTask 1 interrupted
* WaitingTask 3 interrupted
* WaitingTask 0 interrupted
* WaitingTask 4 interrupted
* WaitingTask 5 interrupted
* WaitingTask 2 interrupted
*/
public class CountDownLatchDemo {
static final int SIZE = 100;
public static void main(String[] args) throws Exception{
ExecutorService exec = Executors.newCachedThreadPool();
// All must share a single CountDownLatch object:
CountDownLatch latch = new CountDownLatch(SIZE);
for (int i = 0; i < 10; i++) {
exec.execute(new WaitingTask(latch));
}
for (int i = 0; i < SIZE; i++) {
exec.execute(new TaskPortion(latch) {
});
}
System.out.println("Launched all tasks");
TimeUnit.MILLISECONDS.sleep(200);
exec.shutdownNow();// Quit when all tasks complete
}
}
class TaskPortion implements Runnable {
private static int counter = 0;
private final int id = counter++;
private static Random rand = new Random(47);
private final CountDownLatch latch;
TaskPortion(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
doWork();
latch.countDown();
} catch (InterruptedException ex) {
// Acceptable way to exit
}
}
public void doWork() throws InterruptedException{
TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));
System.out.println(this + "completed");
}
public String toString() {
return String.format("%1$-3d " , id);
}
}
class WaitingTask implements Runnable {
private static int counter = 0;
private final int id = counter++;
private final CountDownLatch latch;
WaitingTask(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
try {
latch.await();// 调用这个方法的任务会被阻塞 直至countDown计数为0
System.out.println("Latch barrier passed for " + this);
} catch (InterruptedException ex) {
System.out.println(this + "interrupted");
}
}
public String toString() {
return String.format("WaitingTask %1$-3d", id);
}
}
21.7.2 CyclicBarrier
package twentyone.seven;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
public class HorseRace {
static final int FINISH_LINE = 75;
private List<Horse> horses = new ArrayList<Horse>();
private ExecutorService exec = Executors.newCachedThreadPool();
private CyclicBarrier barrier;
public HorseRace(int nHorses, final int pause){
barrier = new CyclicBarrier(nHorses, () -> {
/**
* 里面的任务在nHorses都执行完毕且都在等待时触发,此处的任务结束之后再继续执行nHorses的任务
*/
StringBuilder s = new StringBuilder();
for (int i = 0; i < FINISH_LINE; i++) {
s.append("="); // The fence on the racetrack
}
System.out.println(s);
for (Horse hors : horses) {
System.out.println(hors.tracks());
}
for (Horse hors : horses) {
if (hors.getStrides() >= FINISH_LINE){
System.out.println(hors + "won!");
exec.shutdownNow();
return;
}
}
try{
TimeUnit.MILLISECONDS.sleep(pause);
}catch (InterruptedException e) {
System.out.println("barrier-action sleep interrupted");
}
});
// 先执行此处,创建线程任务
for (int i = 0; i < nHorses; i++) {
Horse horse = new Horse(barrier);
horses.add(horse);
exec.execute(horse);
}
}
public static void main(String[] args) {
int nHorses = 7;
int pause = 200;
if (args.length > 0) {// Optional argument
int n = new Integer(args[0]);
nHorses = n > 0 ? n : nHorses;
}
if (args.length > 1) { // Optional argument
int p = new Integer(args[1]);
pause = p > -1 ? p : pause;
}
new HorseRace(nHorses, pause);
}
}
class Horse implements Runnable {
private static int counter = 0;
private final int id = counter++;
private int strides = 0;
private static Random rand = new Random(47);
private static CyclicBarrier barrier;
public Horse(CyclicBarrier b) {
barrier = b;
}
public synchronized int getStrides() { return strides; }
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
strides += rand.nextInt(3); // Produces 0, 1 or 2
}
barrier.await();
}
}catch (InterruptedException e) {
// A legitimate way to exit
}catch (BrokenBarrierException e) {
// This one we want to know about
throw new RuntimeException(e);
}
}
public String toString() { return "Horse " + id + " "; }
public String tracks() {
StringBuilder s = new StringBuilder();
for (int i = 0; i < getStrides(); i++) {
s.append("*");
}
s.append(id);
return s.toString();
}
}
21.7.3 DelayQueue
demo中有目前不存在的类,略过。
21.7.4 PriorityBlockingQueue
这个示例十分难读懂。输出结果很迷人。
package twentyone.seven;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 输出中可见,先new的对象在PriorityBlockingQueue中并不会被优先take()
* 从队列中take() 时,优先级高的会被优先取出(但并没有严格的按照优先级取出)
* 后半部分输出是从队列中迭代取得的结果,可以看出队列中元素的顺序与添加的顺序是一致的。
*
* 得出结论: PriorityBlockingQueue会保持元素的添加顺序,但是从队列中take()时,
* 优先级高的会被优先取出,优先级可以实现Comparable接口自定义优先级规则
*
* OUTPUT:
* [8 ] Task 0
* [9 ] Task 5
* [9 ] Task 13
* [9 ] Task 14
* [8 ] Task 10
* [8 ] Task 16
* [8 ] Task 19
* [8 ] Task 11
* [8 ] Task 6
* [8 ] Task 15
* [7 ] Task 9
* [5 ] Task 1
* [3 ] Task 2
* [2 ] Task 8
* [1 ] Task 3
* [1 ] Task 4
* [1 ] Task 17
* [1 ] Task 12
* [0 ] Task 18
* [0 ] Task 7
* [10 ] Task 20
* [10 ] Task 21
* [10 ] Task 22
* [10 ] Task 23
* [10 ] Task 24
* [10 ] Task 25
* [10 ] Task 26
* [10 ] Task 27
* [10 ] Task 28
* Finished PrioritizedTaskProducer
* [10 ] Task 29
* [9 ] Task 39
* [8 ] Task 38
* [7 ] Task 37
* [6 ] Task 36
* [5 ] Task 35
* [4 ] Task 34
* [3 ] Task 33
* [2 ] Task 32
* [1 ] Task 31
* [0 ] Task 30
* (0:8)
* (1:5)
* (2:3)
* (3:1)
* (4:1)
*
* (5:9)
* (6:8)
* (7:0)
* (8:2)
* (9:7)
* (10:8)
* (11:8)
* (12:1)
* (13:9)
* (14:9)
* (15:8)
* (16:8)
* (17:1)
* (18:0)
* (19:8)
* (20:10)
* (21:10)
* (22:10)
* (23:10)
* (24:10)
* (25:10)
* (26:10)
* (27:10)
* (28:10)
* (29:10)
* (30:0)
* (31:1)
* (32:2)
* (33:3)
* (34:4)
* (35:5)
* (36:6)
* (37:7)
* (38:8)
* (39:9)
* (40:-1)
*
* [-1 ] Task 40 Calling shutdownNow()
* Finished PrioritizedTaskConsumer
*
* Process finished with exit code 0
*/
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws Exception{
Random rand = new Random(47);
ExecutorService exec = Executors.newCachedThreadPool();
// PriorityBlockingQueue默认初始容量是11,这个队列会根据自然排序对其内的元素调整顺序
// 实现Comparable接口可以自定义顺序
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
exec.execute(new PrioritizedTaskProducer(queue, exec));
exec.execute(new PrioritizedTaskConsumer(queue));
}
}
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask>{
private Random rand = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;
protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>();
public PrioritizedTask(int priority) {
this.priority = priority;
sequence.add(this);
}
@Override
public int compareTo(PrioritizedTask arg) {
return priority < arg.priority ? 1 : (priority > arg.priority ? -1 : 0);
}
@Override
public void run() {
try{
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println(this);
}
public String toString() {
return String.format("[%1$-3d]", priority) + " Task " + id;
}
public String summary() {
return "(" + id + ":" + priority + ")";
}
public static class EndSentinel extends PrioritizedTask {
private ExecutorService exec;
public EndSentinel(ExecutorService e) {
super(-1); // Lowest priority in this program
exec = e;
}
public void run() {
int count = 0;
for (PrioritizedTask prioritizedTask : sequence) {
System.out.println(prioritizedTask.summary());
if (++count == 5) {
System.out.println();
}
}
System.out.println();
System.out.println(this + " Calling shutdownNow()");
exec.shutdownNow();
}
}
}
class PrioritizedTaskProducer implements Runnable{
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec;
public PrioritizedTaskProducer(Queue<Runnable> q, ExecutorService e) {
queue = q;
exec = e;
}
public void run() {
// Unbounded queue; never blocks.
// Fill it up fast with random priorities:
for (int i = 0; i < 20; i++) {
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}
// Trickle in highest-priority jobs:
try {
for (int i = 0; i < 10; i++) {
TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
// Add jobs, lowest priority first:
for (int i = 0; i < 10; i++) {
queue.add(new PrioritizedTask(i));
}
// A sentinel to stop all the tasks:
queue.add(new PrioritizedTask.EndSentinel(exec));
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished PrioritizedTaskProducer");
}
}
class PrioritizedTaskConsumer implements Runnable {
private PriorityBlockingQueue<Runnable> q;
public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q){ this.q = q; }
public void run() {
try {
while (!Thread.interrupted()){
// Use current thread to run the task:
q.take().run();
}
} catch (InterruptedException e) {
// Acceptable way to exit
}
System.out.println("Finished PrioritizedTaskConsumer");
}
}
21.7.5 使用ScheduledExecutor的温室控制器
package twentyone.seven;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Bing!
* Thermostat to night setting
* Turning on lights
* Turning off lights
* Turning greenhouse water on
* Turning greenhouse water off
* Thermostat to day setting
* Turning on lights
* Turning off lights
* Turning on lights
* Collecting data
* Turning on lights
* Turning greenhouse water on
* Turning on lights
* Turning off lights
* Turning greenhouse water off
* Bing!
* Turning on lights
* Collecting data
* Turning off lights
* Turning greenhouse water on
* Turning on lights
* Turning on lights
* Thermostat to day setting
* Collecting data
* Turning on lights
* Turning off lights
* Turning greenhouse water off
* Turning on lights
* Turning greenhouse water on
* Bing!
* Thermostat to night setting
* Turning on lights
* Turning off lights
* Collecting data
* Turning on lights
* Turning on lights
* Turning off lights
* Turning greenhouse water on
* Turning greenhouse water off
* Collecting data
* Turning on lights
* Turning on lights
* Turning off lights
* Thermostat to day setting
* Bing!
* Turning on lights
* Turning greenhouse water on
* Collecting data
* Turning off lights
* Turning greenhouse water off
* Turning on lights
* Turning on lights
* Collecting data
* Turning on lights
* Turning off lights
* Turning greenhouse water on
* Turning on lights
* Bing!
* Thermostat to night setting
* Turning on lights
* Turning off lights
* Turning greenhouse water off
* Collecting data
* Turning on lights
* Turning greenhouse water on
* Thermostat to day setting
* Turning on lights
* Turning off lights
* Collecting data
* Turning on lights
* Turning on lights
* Turning off lights
* Turning greenhouse water on
* Turning greenhouse water off
* Terminating
* Wed Dec 25 21:00:00 GMT+08:00 2019 temperature: 66.4 humidity: 50.05
* Wed Dec 25 21:30:00 GMT+08:00 2019 temperature: 68.0 humidity: 50.47
* Wed Dec 25 22:00:00 GMT+08:00 2019 temperature: 69.7 humidity: 51.42
* Wed Dec 25 22:30:00 GMT+08:00 2019 temperature: 70.8 humidity: 50.87
* Wed Dec 25 23:00:00 GMT+08:00 2019 temperature: 72.0 humidity: 50.32
* Wed Dec 25 23:30:00 GMT+08:00 2019 temperature: 73.2 humidity: 49.92
* Thu Dec 26 00:00:00 GMT+08:00 2019 temperature: 74.4 humidity: 49.81
* Thu Dec 26 00:30:00 GMT+08:00 2019 temperature: 76.2 humidity: 50.25
* Thu Dec 26 01:00:00 GMT+08:00 2019 temperature: 77.4 humidity: 51.00
*
* Process finished with exit code 0
*/
public class GreenHouseScheduler {
private volatile boolean light = false;
private volatile boolean water = false;
private String thermostat = "Day";
public synchronized String getThermostat() { return thermostat; }
public synchronized void setThermostat(String value) { thermostat = value; }
/**
* 这个线程池可以在给定的延迟后执行任务或者定周期性执行
* 如果有任务在执行前canceled,不会自动移除,可以设置setRemoveOnCancelPolicy为true,这样任务在被
* 取消时会即时移除
*/
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10);
public void schedule(Runnable event, long delay) {
// 延迟给定的毫秒数后执行
scheduler.schedule(event, delay, TimeUnit.MILLISECONDS);
}
public void repeat(Runnable event, long initialDelay, long period) {
// 在给定的延迟后周期性的执行
scheduler.scheduleAtFixedRate(event, initialDelay, period, TimeUnit.MILLISECONDS);
}
class LightOn implements Runnable {
public void run() {
// Put hardware control code here to
// physically turn on the light.
System.out.println("Turning on lights");
light = true;
}
}
class LightOff implements Runnable {
public void run() {
// Put hardware control here to
// physically turn off the light.
System.out.println("Turning off lights");
light = false;
}
}
class WaterOn implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Turning greenhouse water on");
water = true;
}
}
class WaterOff implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Turning greenhouse water off");
water = false;
}
}
class ThermostatNight implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Thermostat to night setting");
setThermostat("Night");
}
}
class ThermostatDay implements Runnable {
public void run() {
// Put hardware control code here.
System.out.println("Thermostat to day setting");
setThermostat("Day");
}
}
class Bell implements Runnable {
@Override
public void run() {
System.out.println("Bing!");
}
}
class Terminate implements Runnable {
@Override
public void run() {
System.out.println("Terminating");
scheduler.shutdownNow();
// Must start a separate task to do this job,
// since the scheduler has been shut down.
new Thread() {
@Override
public void run() {
for( DataPoint d : data) {
System.out.println(d);
}
}
}.start();
}
}
// New feature: data collection
static class DataPoint {
final Calendar time;
final float temperature;
final float humidity;
public DataPoint(Calendar d, float temp, float hum) {
time = d;
temperature = temp;
humidity = hum;
}
public String toString() {
return time.getTime() + String.format(" temperature: %1$.1f humidity: %2$.2f", temperature, humidity);
}
}
private Calendar lastTime = Calendar.getInstance();
{// Adjust date to the half hour
lastTime.set(Calendar.MINUTE, 30);
lastTime.set(Calendar.SECOND, 00);
}
private float lastTemp = 65.0f;
private int tempDirection = +1;
private float lastHumidity = 50.0f;
private int humidityDirection = +1;
private Random rand = new Random(47);
List<DataPoint> data = Collections.synchronizedList(new ArrayList<>());
class CollectData implements Runnable {
public void run() {
System.out.println("Collecting data");
synchronized (GreenHouseScheduler.this) {
// Pretend the interval is longer than it is:
lastTime.set(Calendar.MINUTE, lastTime.get(Calendar.MINUTE) + 30);
// One in 5 chancesof reversing the direction:
if (rand.nextInt(4) == 4) {
tempDirection = -tempDirection;
}
// Store previous value:
lastTemp = lastTemp + tempDirection * (1.0f + rand.nextFloat());
if (rand.nextInt(5) == 4) {
humidityDirection = -humidityDirection;
}
// Store previous value:
lastHumidity = lastHumidity + humidityDirection * rand.nextFloat();
// Calendar must be cloned, otherwise all
// DataPoints hold references to the same lastTime.
// For a basic object like Calendar, clone() is OK.
data.add(new DataPoint((Calendar)lastTime.clone(), lastTemp, lastHumidity));
}
}
}
public static void main(String[] args) {
GreenHouseScheduler gh = new GreenHouseScheduler();
gh.schedule(gh.new Terminate(), 5000);
// Former "Restart" class not necessary:
gh.repeat(gh.new Bell(), 0, 1000);
gh.repeat(gh.new ThermostatNight(), 0, 2000);
gh.repeat(gh.new LightOn(), 0, 200);
gh.repeat(gh.new LightOff(), 0, 400);
gh.repeat(gh.new WaterOn(), 0, 600);
gh.repeat(gh.new WaterOff(), 0, 800);
gh.repeat(gh.new ThermostatDay(), 0, 1400);
gh.repeat(gh.new CollectData(), 500, 500);
}
}