Java并发

6. 任务执行

2017-04-20  本文已影响4人  大海孤了岛

任务通常是一些抽象的且离散的工作单元。通过把应用程序的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的事务边界来优化错误恢复过程,以及提供一种自然的并行工作结构来提升并发性。

6.1 在线程中执行任务

  • 当围绕“任务执行”来设计应用程序时,第一步是要找出清晰的任务边界。
6.1.1 串行地执行任务
class SingleThreadWebServer{
    public stati void main(String[] args) throws IOException{
        ServerSocket socket = new ServerSocket(80);
        while (true){
            Socket connection = socket.accept();
            handleRequest(connection);
        }
    }
}

如上为串行的Web服务器实现,在理论上是正确的,但在实际应用上它的执行性能是非常糟糕的,因为它每次只能处理一个请求。

6.1.2 显式地为任务创建线程
class ThreadPerTaskWebServer{
    public stati void main(String[] args) throws IOException{
        ServerSocket socket = new ServerSocket(80);
        while (true){
            final Socket connection = socket.accept();

            Runnable task = new Runnable(){
                public void run(){
                    handleRequest(connection);
                }
            };

            new Thread(task).start();
        }
    }
}

如上采用的是为每个请求创建一个新的线程来提供服务,从而实现更高的响应性。
但要注意的是这里的任务处理代码handleRequest方法必须是线程安全的,因为当有多个任务时会并发地调用这段代码。

6.1.3 无限制创建线程的不足
  • 线程生命周期的开销非常高。线程的创建过程需要时间,延迟处理的请求,并且需要JVM和操作系统提供一些辅助帮助。如果请求的到达率非常高且请求的处理过程是轻量级的,那么为每个请求创建一个新线程会消耗大量的计算资源。

6.2 Executor框架

串行执行的问题在于其糟糕的响应性和吞吐量,而“为每个任务分配一个线程”的问题在于资源管理的复杂性。因此,为了提供了一种灵活的线程池来实现作为Executor框架的一部分,来简化线程的管理工作。

如下为基于线程池的Web服务器:

class ThreadPerTaskWebServer{
    //定义线程池大小
    private static final int NTHREAD = 100;
    //定义Executor
    private static final Executor exec = 
        Executors.newFixedThreadPool(NTHREAD);

    public static void main(String[] args) throws IOException{
        ServerSocket socket = new ServerSocket(80);
        while (true){
            final Socket connection = socket.accept();

            Runnable task = new Runnable(){
                public void run(){
                    handleRequest(connection);
                }
            };
            //将任务添加到线程池中
            exec.execute(task);
        }
    }
}

6.2.2 执行策略

在定义执行策略时,需要考虑任务的“What,Where,When,How”等方面。

6.2.3 线程池

线程池指的是管理一组同构工作线程的资源池。线程池往往与工作队列有关。在工作队列中保存了所有等待执行的任务。工作者线程从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

6.2.4 Executor的生命周期

Executor的实现通常会创建线程来执行任务,但JVM只有在所有(非守护线程)线程全部终止后才会退出。因此,如果无法正确地关闭Executor,那么JVM将无法结束。

public interface ExecutorService extends Executor{
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTerminated(long timeout, TimeUtil unit)
        throws InterruptedException;
}

如下为支持关闭操作的Web服务器

class LifecycleWebServer{
    private final ExecutorService exec = ...;

    public static void main(String[] args) throws IOException{
        ServerSocket socket = new ServerSocket(80);
        while (! exec.isShutdown()){
            try{
                final Socket connection = socket.accept();
                exec.execute(new Runnable(){
                    public void run() { handleRequest(connection); }
                })
            } catch (RejectedExecutionException e){
                if (!exec.isShutdown())
                    log("task submission rejected",e);
            }
        }
    }

    public void stop() { exec.shutdown(); }

    void handleRequest(Socket connection){
        Request req = readRequest(connection);
        //判断是否为请求关闭的指令
        if (isShutdownRequest(req))
            stop();
        else 
            dispatchRequest(req);
    }
}

假如我们需要关闭服务器,那么可以在程序中调用stop方法,或者以客户端请求形式向Web服务器发送一个特定格式的HTTP请求。

6.3 找出可利用的并行性

我们来实现一个浏览器的页面渲染功能,它的作用是将HTML页面绘制到图像缓存中,为了简单起见,我们假设HTML页面中只包含标签文本和图片。

public class SingleThreadRenderer{
    void renderPage(CharSequence source){
        //加载文本
        renderText(source);
        List<ImageData> ImageData = new ArrayList<ImageData>();
        //下载图片
        for (ImageData imageInfo : scanForImageInfo(source))
            ImageData.add(imageInfo.downloadImage());
        //加载图片
        for (ImageData data : ImageData)
            renderImage(data);
    }
}

评价:该中方式在图片下载过程中大部分时间都是在等待I/O操作执行完成,在这期间CPU几乎不做任何工作,使得用户在看到最终页面之前要等待很长的时间。

Future表示一个任务的生命周期,并提供了相应的方法判断是否已经完成或取消,以及获取任务的结果和取消任务等。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException,ExecutionException,
            CancellationException;
    //限时获取
    V get(long timeout, TimeUtil unit) throws InterruptedException,
        ExecutionException, CancellationException, TimeoutException;
}

使用Future实现页面渲染器:

public class FutureRenderer {
    private final ExecutorService exec = ....;

    void renderPage(CharSequence source){
        //获取图片信息
        final List<ImageInfo> imageInfos = scanForImageInfo(source);
        //定义图片下载任务
        Callable<List<ImageData>> task = 
            new Callable<List<ImageData>>() {
                //通过call方法返回结果
                public List<ImageData> call(){
                    public List<ImageData> result 
                        = new ArrayList<ImageData>();
                    for (ImageInfo imageInfo : imageInfos)
                        result.add(imageInfo.downloadImage());
                    return result;
                }

            };
        //将任务添加到线程池中
        Future<List<ImageData>> future = exec.submit(task);
        //加载文本信息
        renderText(source);

        try{
            //获取图片结果,并加载图片
            List<ImageData> imageData = future.get();
            for (ImageData data : imageData)
                renderImage(data);
        } catch (InterruptedException e){
            //重新设置线程的中断状态
            Thread.currentThread().interrupt();
            //取消任务
            future.cancel(true);
        } catch (ExecutionException e){
            throw launderThrowable(e.getCause());
        }
    }
}

如上,我们将渲染过程分解为文本渲染和图片渲染,使得两者并发执行。

6.3.4 在异构任务并行化中存在的局限
  • 在上面的FutureRender中使用了两个任务,一个是负责渲染文本,一个是负责渲染图片。如果渲染文本的速度远远高于渲染图片的速度,那么程序的最终性能与串行执行的性能差别并不大,而代码却变复杂了。
public class FutureRenderer {
    private final ExecutorService exec = ....;

    void renderPage(CharSequence source){
        //获取图片信息
        final List<ImageInfo> imageInfos = scanForImageInfo(source);
        //定义任务结果
        final List<Future<ImageData>> futures = new ArrayList<Future<ImageData>>();

        for (ImageInfo imageInfo : imageInfos){
            //定义任务
            Callable<ImageData> task = new Callable<ImageData>(){
                public ImageData call(){
                    return imageInfo.downloadImage();
                }
            }
            //添加到线程池中
            futures.add(exec.submit(task));
        }

        //遍历任务结果
        for (Future future : futures){
            try {
                //获取图片信息,并加载
                ImageData imageData = future.get();
                renderImage(imageData);
            }catch (InterruptedException e){
                //重新设置线程的中断状态
                Thread.currentThread().interrupt();
                //取消任务
                future.cancel(true);
            } catch (ExecutionException e){
                throw launderThrowable(e.getCause());
            }
        }
    }
}

如上,我们为每张图片都创建一个任务执行。但这里存在一个缺陷,我们在最后遍历futures时,调用get方法获取图片,我们直到这个的get方法若任务已经完成,那么会直接获取到图片,若任务还未完成,那么会阻塞,直到任务完成。那么存在这么个问题:若第一张图未下载完毕,而第二张下载完毕,这时候第二张会因为第一张未下载完成而导致被阻塞获取到。

CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。

public class Renderer {
    private final ExecutorService exec;

    Renderer(ExecutorService exec) { this.exec = exec; }

    void renderPage(CharSequence source){
        //获取图片信息
        final List<ImageInfo> imageInfos = scanForImageInfo(source);
        //定义CompletionService
        CompletionService<ImageData> completionService = 
            new ExecutorCompletionService<ImageData>(exec);
        //将每张图片封装为任务
        for (final ImageInfo imageInfo : imageInfos){
            completionService.submit(new Callable<ImageData>(){
                public ImageData call(){
                    return imageInfo.downloadImage();
                }
            })
        }

        renderText(source);
        //获取图片信息
        for (int t = 0; t < imageInfos.size(); i ++){
            try {
                Future<ImageData> f = completionService.take();
                ImageData imageData = f.get();
                renderImage(imageData);
            }catch (InterruptedException e){
                //重新设置线程的中断状态
                Thread.currentThread().interrupt();
            } catch (ExecutionException e){
                throw launderThrowable(e.getCause());
            }
            
        }

    }
}

如上,为每一张图下载都创建一个独立任务,并在线程池中执行它们,从而将串行的下载过程转换为并行的过程,这将减少下载所有图片的总时间。

6.3.7 为任务设置时限

如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。但要注意,当这些任务超时后应该立即停止,从而避免浪费计算不必要的资源。

我们设置一个获取广告的机制,若在规定时间内获取到广告,则加载广告,否则设置默认广告。

Page renderPageWithAd() thorws InterruptedException{
    //设定结束时间
    long endNanos = System.nanoTime() + TIME_BUGGET;
    //提交任务
    Future<Ad> f = exec.submit(new FetchAdTask());
    //加载主界面
    Page page = renderPageBody();
    Ad ad;
    try {
        //在限定时间内获取广告,若线程异常或超时则设置为默认的广告
        long timeleft = endNanos - System.nanoTime();
        ad = f.get(timeleft, NANOSECONDS);
    } catch (ExecutionException e) {
        ad = DEFAULT_AD;
    } catch (TimeoutException e){
        ad = DEFAULT_AD;
        //超时后,取消任务
        f.cancel(true);
    }
    page.setAd(ad);
    return page;
}
public class InvokeAllTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        List<Callable<Integer>> tasks = new ArrayList<>();
        Callable<Integer> task = null;
        for (int i = 0; i < 10; i ++){
            task = new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    int random = new Random().nextInt(1000);
                    Thread.sleep(random);
                    System.out.println(Thread.currentThread().getName() + "休眠了 " + random);
                    return random;
                }
            };
            tasks.add(task);
        }
        long s = System.currentTimeMillis();
        List<Future<Integer>> results = exec.invokeAll(tasks);
        System.out.println("执行任务消耗了:" + (System.currentTimeMillis() - s) + "ms");
        for (int i = 0; i < results.size(); i ++){
            try {
                System.out.println(results.get(i).get());
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
}

输出结果:
pool-1-thread-5休眠了 276
pool-1-thread-1休眠了 426
pool-1-thread-8休眠了 479
pool-1-thread-10休眠了 561
pool-1-thread-4休眠了 641
pool-1-thread-6休眠了 760
pool-1-thread-9休眠了 780
pool-1-thread-3休眠了 854
pool-1-thread-2休眠了 949
pool-1-thread-7休眠了 949
执行任务消耗了:974ms
426
949
854
641
276
760
949
479
780
561

如上,我们可以看到最后任务结果的输出是按照顺序输出的。

上一篇 下一篇

猜你喜欢

热点阅读