Java干货Java 杂谈

Java任务取消方案

2017-11-24  本文已影响13人  德彪
image.png

取消原因

取消一个任务执行的理由有很多,通常有以下几个

取消线程执行

任务的取消执行,其实最后都会落到线程的终止上(任务都是由线程来执行)。在java中没有一种安全的抢占式方法来终止线程(Thread.stop 是不安全的终止线程执行的方法,已经废弃掉了),所以需要一种很好的协作机制来平滑的关闭任务。

自然结束

中断线程的最好方法是让代码自然执行到结束,而不是从外部强制打断他。为此可以设置一个“任务取消标志”,任务代码会定期的查看这个标志,如果发现标志被设定了,则任务提前结束。

public class SomeJob {
    private List<String> list = new ArrayList<>();
    private volatile boolean canceled = false;

    public void run() {
        while (!canceled) {
            String res = getResult();
            synchronized (this) {
                list.add(res);
            }
        }
    }

    private String getResult() {
        // do something...
        return "";
    }
    
    public void cancel() {
        this.canceled = true;
    }
}

上面的代码中,设置了一个volatile类型的变量canceled,所以其他线程对这个变量的修改对所有线程都是可见的(可见性)。每次循环执行某个操作之前都会检查这个变量是否被其他线程设置为true,如果为true则提前退出。

这是很常见的一种取消任务执行的手段,但是也有他的弊端,比如:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class SomeJob {
    private BlockingQueue<String> list = new LinkedBlockingQueue<>(100);
    private volatile boolean canceled = false;

    public void run() {
        try {
            while (!canceled) {
                String res = getResult();
                synchronized (this) {
                    list.put(res);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private String getResult() {
        // do something...
        return "";
    }

    public void cancel() {
        this.canceled = true;
    }
}

上面将list替换为支持阻塞的BlockingQueue,他是一个有界队列,当调用list的put操作时,如果队列已经填满,那么将会一直阻塞直到队列有空余位置为止。如果恰好执行put操作是阻塞了,此时我们调用了cancel方法,那么什么时候检查canceled标志是不确定的,响应性很差,极端情况下,有可能永远也不会去再下一次轮询中检查canceled标志,试想我们执行了取消后,消费队列的线程已经停止,此时put操作又阻塞,那么将会一直阻塞下去,这个线程失去响应。

线程中断

通过线程自己的中断机制,可以解决上述问题。

每个线程都有一个boolean类型的变量,表示中断状态。当中断线程时,这个线程的中断状态将被设置为true。在Thread中有三个方法可以设置或访问这个变量:

调用interrupt并不意味着立即停止目标线程正在进行的任务,而只是将中断状态设置为true:他并不会正真中断一个正在运行的线程,而只是发出了一种中断请求,线程可以看到这个中断状态,然后在合适的时刻处理。

中断请求

响应中断阻塞

上面提到的中断请求,有些方法会处理这些请求,从而结束现在正在进行的任务。像上面代码中的BlockingQueue.put方法,当他在阻塞状态时,依然能够发现中断请求并提前返回,所以解决上面代码中的问题只需要对执行代码的线程thread调用thread.interrupt方法,BlockingQueue.put就可以从阻塞状态中恢复回来,从而完成取消。类似这样的支持中断的阻塞就叫做响应中断阻塞,主要有以下几个:

这些支持中断的阻塞在响应中断时执行的操作包括:

jvm并不能保证这些阻塞方法检测到中断的速度,但在实际情况中响应速度还是很快的。

利用线程本身的中断状态作为取消机制,我们可以将上面的代码再改造一下:

public class SomeJob {
    private BlockingQueue<String> list = new LinkedBlockingQueue<>();

    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                String res = getResult();
                synchronized (this) {
                    list.put(res);
                }
            }
        } catch (InterruptedException e) {
            System.out.println("任务被取消...");
        }
    }

    private String getResult() {
        // do something...
        return "";
    }

    public void cancel(Thread thread) {
        thread.interrupt();
    }
}

任务代码在每次轮询操作前检查当前线程的状态,如果被中断了就退出。cancel方法是对当前执行任务的线程进行中断。

注意,调用cancel方法的是另一线程,传入的线程实例则是执行run方法的工作者线程,故在执行cancel方法后run方法可以检测到中断。

不响应中断阻塞

并非所有的阻塞方法和阻塞机制都能够响应中断请求,比如正在read或write上阻塞的socket就不会响应中断,调用线程的interrupt方法只能设置线程的中断状态,除此以外没有任何作用,因为这些阻塞方法并不会去检查线程中断状态,也不会处理中断。这些阻塞就是不响应中断阻塞。主要有以下几个:

一个简单的例子,取消socket任务:

public class CanceledThread extends Thread {
    private final Socket socket;
    private final InputStream stream;
    public CanceledThread(Socket socket) throws IOException {
        this.socket = socket;
        this.stream = socket.getInputStream();
    }
    @Override
    public void interrupt() {
        try {
            socket.close();
        } catch (Exception e) {
            // do nothing
        } finally {
            super.interrupt();
        }
    }
    
    @Override
    public void run() {
        try {
            byte[] bytes = new byte[1024];
            while (true) {
                int count = stream.read(bytes);
                if (count < 0) {
                    break;
                } else if (count > 0) {
                    // 处理读到 bytes
                }
                
            }
        } catch (Exception e) {
            // 可能捕捉到InterruptedException 或 SocketException
            // 线程退出
        }
    }
}

在上面的代码中,即使socket的stream在read过程中阻塞了,也可以中断阻塞并返回。

中断处理

上文提到,当调用可中断的阻塞库函数时,会抛出InterruptedException,这个异常会出现在我们的任务代码中(任务代码调用了这些阻塞方法),有三种方法处理这个异常:

ThreadPoolExcutor就是处理中断的一个例子:当其拥有的工作者线程检测到中断时,他会检查线程池是否正在关闭。如果是,他会在结束前执行一些线程清理工作,否则他可能创建一个新线程将线程池恢复到合理的规模。

取消任务

终止线程池

线程池的生命周期是由ExcutorService控制的。ExcutorService提供了两种关闭线程池的方法:

终止基于线程的服务

在写程序时往往会用到日志,在代码中插入println也是一种日志行为。为了避免由于日志为服务带来性能损耗和并发风险(多个线程同时打印日志有可能引发并发问题),我们往往将打印日志任务放到某个队列中,由专门的线程从队列中取出任务进行打印。下面设计这样一个日志服务:

public class LogService {
    private final BlockingQueue<String> queue;
    private final PrintWriter writer;
    private final LoggerThread thread;
    private boolean isShutDown = false;
    private int reservations = 0;

    public LogService(PrintWriter writer) {
        this.writer = writer;
        thread = new LoggerThread();
        queue = new LinkedBlockingQueue<>();
    }
    
    public void shutdown() {
        synchronized (this) {
            isShutDown = true;
        }
        thread.interrupt();
    }
    
    public void log(String msg) throws InterruptedException {
        synchronized (this) {
            if (isShutDown) {
                throw new IllegalStateException("日志服务已经关闭...");
            }
            reservations ++;
        }
        queue.put(msg);
    }

    class LoggerThread extends Thread {
        @Override
        public void run() {
            try {
                while (true) {
                    try {
                        synchronized (LogService.this) {
                            if (isShutDown && reservations == 0) {
                                break;
                            }
                            String msg = queue.take();
                            synchronized (LogService.this) {
                                reservations--;
                            }
                            writer.println(msg);
                        }

                    } catch (InterruptedException e) {
                        // retry
                    }
                }
            } finally {
                writer.close();
            }
        }
    }
}

当关闭日志服务时,日志服务不再会接收新的日志打印请求,并且会将队列中剩余的所有打印任务执行完毕,最后结束。如果此时日志打印线程恰好在queue.take方法中阻塞了,关闭日志服务时也能很好的从阻塞中恢复过来,结束服务。

上一篇下一篇

猜你喜欢

热点阅读