Thinking in java 之并发其一:如何实现多线程
一、基本的线程机制
java的并发编程可以将程序划分成多个分离并且能够独立运行的任务。每个独立任务都通过一个执行线程来驱动。一个线程就是在进程中的一个单一的顺序控制流,因此,单个进程可以拥有多个并发执行的任务。在运行时,CPU将轮流给每个任务分配其占用时间。
二、定义任务
在java中,定义一个任务的常用方式是实现“Runnable”接口,实现 “Runnable” 接口的 run() 方法,run() 内的就是任务的内容。
在 Thinking in Java 中的例子为 LiftOff 类,定义了一个倒计时发射的任务。
public class LiftOff implements Runnable {
protected int countDown = 10;
private static int taskCount = 0;
private final int id = taskCount++;
public LiftOff() {}
public LiftOff(int countDown) {
this.countDown=countDown;
}
public String status() {
return "#"+id+"("+(countDown > 0 ? countDown : "Liftoff!")+").";
}
@Override
public void run() {
while(countDown-- > 0) {
System.out.println(status());
Thread.yield();
}
}
}
Thread.yield(),是线程机制的一部分,他可以发出一个建议,表示该线程已经完成主要任务,建议CPU切换到其他线程,但不一定100%切换。此处使用 yield 的目的在于让线程之间的切换更加明显。
二、通过Thread类将Runnable对象转变为工作任务
如果我们直接在 main 方法中创建一个 LiftOff 实例并且调用他的 run() 方法也是可以执行的,但该任务还是使用了和 main() 方法一样的线程。如果希望它能够独立于 main 有自己的线程,可以将 Runnable 对象提交给一个 Thread 构造器,Thread 对象的 start() 方法会新建一个线程,并利用该线程执行 run() 方法。
public class BasicThread {
public static void main(String[] args) {
Thread t = new Thread(new LiftOff());
t.start();
System.out.println("Waiting for LiftOff");
}
}
/*
* 运行结果:
Waiting for LiftOff
#0(9).
#0(8).
#0(7).
#0(6).
#0(5).
#0(4).
#0(3).
#0(2).
#0(1).
#0(Liftoff!).
*/
输出的结果显示,控制台首先打印出了"Waitting for LiftOff"的字符串,然后是 run() 方法里的输出,证明了main 和 run 不在同一个线程里运行。
为了能够更明显的突出线程之间的切换,可以创建多个线程。
public class MoreBasicThread {
public static void main(String[] args) {
// TODO Auto-generated method stub
for(int i=0;i<5;i++)
{
Thread t = new Thread(new LiftOff());
t.start();
}
System.out.println("Waiting for LiftOff");
}
}
/*output:
Waiting for LiftOff
#1(2).
#3(2).
#1(1).
#4(2).
#2(2).
#0(2).
#2(1).
#4(1).
#1(Liftoff!).
#3(1).
#4(Liftoff!).
#2(Liftoff!).
#0(1).
#3(Liftoff!).
#0(Liftoff!).*/
每使用一次 new Thread() 都需要进行一系列的准备工作,其耗时和所需资源都是巨大的。为了节省资源,当一个任务结束时,该线程的资源需要能够立即分配给下一个任务。java 里的线程池可以实现该功能。
常用的线程池主要有三种:CachedThreadPool、FixedThreadPool以及SingleThreadPool。CachedThreadPool可以根据需要创建相应的线程,当某个任务完成之时,可以将空余出来的线程留给其他任务使用。如果线程数量不够,则会自动新建一个线程。并且,当某个线程在一定时间内没有使用时,会终止该线程,并且从线程池中移除。FixedThreadPool 会在一开始创建固定数量的线程,这些线程不会消失,当某个线程的任务完成时,该线程会一直存在等待新的任务,不会因为空闲时间过长而被清除,只能通过手动的方式去关闭。至于 SingleThreadPool 则是线程数量为 1 的 FixedThreadPool。
一般不会通过构造器来创建线程池的实例,而是用Executors来帮我们创建。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPool {
public static void main(String[] args) {
// TODO Auto-generated method stub
ExecutorService exec = Executors.newCachedThreadPool();
for(int i=0;i<5;i++) {
exec.execute(new LiftOff());
}
System.out.println("Waiting for LiftOff");
exec.shutdown();
}
}
/*output:
#0(2).
#2(2).
#3(2).
#1(2).
#4(2).
Waiting for LiftOff
#4(1).
#1(1).
#3(1).
#2(1).
#0(1).
#3(Liftoff!).
#1(Liftoff!).
#4(Liftoff!).
#0(Liftoff!).
#2(Liftoff!).*/
四、从任务中产生返回值
run() 方法是没有返回值的,通过实现 Runnable 创建的任务也就没有返回值。如果需要创建一个具有返回值的任务,可以通过实现 Callable 接口(而不是 Runnable)来完成。它是一种具有类型参数的泛型,它的类型参数表示的是从方法 call() (相对于 Runnable 的 run)中返回的值。Callable 需要配合 ExecutorService(上面三个线程池都是ExecutorService的具体实现) 的 submit 方法。该方法会产生 Feture对象,它用Callable返回结果的特定类型进行了参数化。
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
class TaskWithResult implements Callable<String>{
private int id;
public TaskWithResult(int id) {
this.id = id;
}
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
return "Result of TaskWithResult "+ id;
}
}
public class CallableDemo {
public static void main(String[] args) {
// TODO Auto-generated method stub
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> results = new ArrayList<Future<String>>();
for(int i=0;i<10;i++) {
results.add(exec.submit(new TaskWithResult(i)));
}
for(Future<String> item : results) {
try {
System.out.println(item.get());
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
exec.shutdown();
}
}
}
}
/*output:
Result of TaskWithResult 0
Result of TaskWithResult 1
Result of TaskWithResult 2
Result of TaskWithResult 3
Result of TaskWithResult 4
Result of TaskWithResult 5
Result of TaskWithResult 6
Result of TaskWithResult 7
Result of TaskWithResult 8
Result of TaskWithResult 9*/
item.get() 和 item 里任务的执行由于在不同的线程,在输出 item.get() 时并不能确定它对应的 call() 是否已经完成。get() 会一直阻塞直到 call 完成并将值返回,当然,也可以通过 isDone() 方法来判断是否完成。
五、进程的休眠
Thread.yield() 方法效果等同于降低线程的优先级,但不能保证该线程一定能暂停,确保线程暂停可以调用 TimeUnit.MILLISECONDS.sleep() 来实现。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class SleepingTask extends LiftOff {
public void run() {
try {
while(countDown-- > 0) {
System.out.println(status());
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
System.err.println("Interupted");
}
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i=0;i<5;i++) {
exec.execute(new SleepingTask());
}
exec.shutdown();
}
}
/*output:
#2(2).
#3(2).
#0(2).
#1(2).
#4(2).
#3(1).
#2(1).
#4(1).
#1(1).
#0(1).
#2(Liftoff!).
#3(Liftoff!).
#1(Liftoff!).
#4(Liftoff!).
#0(Liftoff!).*/
从输出结果没有一个 LiftOff 任务是连续倒计时两次可以看出,sleep 的确产生了作用。
值得注意的是,sleep() 可能会抛出 InterruptedException 异常,由于处在不同的线程中,该异常时无法传播给 main() 的 因此必须在本地(及 run() 方法里)处理所有任务内部产生的异常。
六、捕获异常
除了在 run() 内部去处理异常,是否还有其他更好的办法?
可以通过改变 Executors 产生线程的方式捕捉从 run() 中逃出来的异常。Thread.UncaughtExceptionHandler 是一个接口,它允许我们在每个Thread对象上都附着一个异常处理器。该处理器会在线程因未捕获的异常而临近死亡时被调用。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
class ExceptionThread2 implements Runnable {
@Override
public void run() {
// TODO Auto-generated method stub
Thread t = Thread.currentThread();
System.out.println("run() by" + t);
System.out.println("en = " + t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
class MyUncaughExceptionHandler implements Thread.UncaughtExceptionHandler{
@Override
public void uncaughtException(Thread t, Throwable e) {
// TODO Auto-generated method stub
System.out.println("caught " + e);
}
}
class HandlerThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
// TODO Auto-generated method stub
System.out.println(this+"creating new Thread");
Thread t = new Thread(r);
System.out.println("create " + t);
t.setUncaughtExceptionHandler(new MyUncaughExceptionHandler());
System.out.println("eh = "+t.getUncaughtExceptionHandler());
return t;
}
}
public class CaptureUncaughtException{
public static void main(String[] args) {
ExecutorService exc = Executors.newCachedThreadPool(new HandlerThreadFactory());
exc.execute(new ExceptionThread2());
}
}
/*output:
ThreadTest.HandlerThreadFactory@16b4a017creating new Thread
create Thread[Thread-0,5,main]
eh = ThreadTest.MyUncaughExceptionHandler@2a3046da
run() byThread[Thread-0,5,main]
en = ThreadTest.MyUncaughExceptionHandler@2a3046da
ThreadTest.HandlerThreadFactory@16b4a017creating new Thread
create Thread[Thread-1,5,main]
eh = ThreadTest.MyUncaughExceptionHandler@1d93e3d8
caught java.lang.RuntimeException*/
在上述的例子中,run() 中出现的异常被捕捉并且作为参数传递给了 uncaughtException 方法。可以在该方法中对异常进行处理。
并且 UncaughtExceptionHandler 是作为线程池的构造参数使用的,它规定了线程池在给把任务包装成线程时需要绑定一个 UncaughtExceptionHandler。
七、线程的优先级
上文曾提到,Thread.yield()效果等同于降低线程的优先级(但并不是真的降低优先级),而真正对优先级进行操作的是 Thread.currentThread.setPriority()。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SimplePriorities implements Runnable {
private int countDown = 5;
private int priority;
public SimplePriorities(int priority) {
this.priority = priority;
}
public String toString() {
return Thread.currentThread() + " : " + countDown;
}
public void run() {
Thread.currentThread().setPriority(priority);
System.out.println(toString() + "this thread's priority is "+priority);
}
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i=0;i<5;i++) {
exec.execute(new SimplePriorities(Thread.MIN_PRIORITY));
}
exec.execute(new SimplePriorities(Thread.MAX_PRIORITY));
exec.shutdown();
}
}
/*output:
Thread[pool-1-thread-1,1,main] : 5this thread's priority is 1
Thread[pool-1-thread-2,1,main] : 5this thread's priority is 1
Thread[pool-1-thread-5,1,main] : 5this thread's priority is 1
Thread[pool-1-thread-6,10,main] : 5this thread's priority is 10
Thread[pool-1-thread-4,1,main] : 5this thread's priority is 1
Thread[pool-1-thread-3,1,main] : 5this thread's priority is 1
*/
其中:Thread.MAX_PRIORITY 和 Thread.MIN_PRIORITY分别表示优先级的最大值和最小值。从输出结果来看,priotity 为10的线程是最后创建的,但是却不是最后执行的,可以明显看出优先级的影响。
八、后台线程
后台线程和普通线程的区别是,后台线程无法保证程序的进行。即当所有前台线程结束时,无论后台线程是否结束,程序都会结束。将线程设置为后台线程的方式为 setDeamon 方法。
import java.util.concurrent.TimeUnit;
/*
* 后台线程案例
* 后台线程的特点是,一旦其他线程停止,程序停止
*/
public class SimpleDaemons implements Runnable{
@Override
public void run() {
// TODO Auto-generated method stub
try {
while(true) {
TimeUnit.MILLISECONDS.sleep(100);
System.out.println(Thread.currentThread()+" " + this);
}
}catch(InterruptedException e) {
System.out.println("Sleep interrupt");
}
}
public static void main(String[] args) throws InterruptedException {
for(int i=0;i<5;i++) {
Thread daemon=new Thread(new SimpleDaemons());
//设置为后台线程
daemon.setDaemon(true);
daemon.start();
}
System.out.println("All deamos start");
TimeUnit.MILLISECONDS.sleep(80);
}
}
明显可以看出程序几乎没有任何停留就结束了。