并发编程时,如何写复杂的任务编排?

2020-05-04  本文已影响0人  周群力

java的线程池ExecutorService适合提交互相之间没有依赖的任务,如果任务之间有依赖,就不能简单的调用ExecutorService.submit()了,可能出现死锁、饥饿等情况。
如果任务之间的依赖很简单,靠CompletionService+future.get()可以搞定;如果是分治任务,可以使用Fork/join;而如果任务之间的依赖关系很复杂(比如是一个依赖图,如图),该如何写任务编排呢?


image.png

一、方案

A. 水平切分:Parallel Workers(每个线程跑全套任务)

image.png
每个阶段是一个模块(比如一个独立的类),模块和模块之间通过同步调用来通信,只不过同时有多个线程在跑整个链路。
缺点是没有把IO密集阶段和CPU密集阶段进行时间重叠。
分布式的web应用可以看成是这种模式,每个请求会通过同步调用访问所有模块(分布式服务),同时可能有多个请求并发进行。
并发编程模型管这个叫Parallel Workers模式

B.垂直切分:pipeline模式(异构生产者-消费者)

多线程优化-pipeline模式

image.png

类似于CPU流水线,每个阶段有独立的线程(池),阶段与阶段之间靠queue通信,每个阶段调用BlockingQueue.take()等待新的任务来。
当然,如果用CompletionService,每个阶段可以调用上一个阶段的completionService等待新任务,即调用completionService.take().get(),内部其实还是一个BlockingQueue,区别是每个模块依赖CompletionService还是依赖BlockingQueue

个人理解go中的csp模型就是这种方案,至于csp原始理论上的方案是否如此,没有细究

pros/cons:


image.png

C. Consumer as Producer(同构生产者-消费者)

pipeline模式每个任务阶段要独立部署一个模块(并发编程时模块==类,分布式编程时模块==服务),如果任务阶段是动态提交的,没法改变部署结构,那么可以使用同构的节点实现生产-消费者,每次消费完一个节点(执行完一个任务阶段),通过拓扑排序找到下一个要执行的节点、提交到队列

image.png

这种方案在分布式爬虫中有用到

D. Actor模式

并发编程模型把Actor也算做流水线的一种

E.函数式并行

image.png

二、例题

2.1. building-h2o

抽象的任务依赖图为:


image.png

可以用pipeline模式,A Consumer听A queue,B Consumer听B queue,reset Consumer听 result queue。但是这题不好这样写,这题A consumer消费并打印不能自己起个线程while(true)取blockingQueue并消费,这题A consumer得由oj自己调用,调一次打一次,傻屌……

那么就简单一点,A Consumer取A信号量,B Consumer取B信号量,reset Consumer听 result queue


class H2O {
 private Semaphore              hReady = new Semaphore(2);
    private Semaphore              oReady = new Semaphore(1);
    private BlockingQueue<Integer> q      = new ArrayBlockingQueue<Integer>(3);

    public H2O() {
        new Thread(() -> {
            while (true){
                try {
                    for (int i = 0; i < 3; i++) {
                        Integer take = q.poll(5, TimeUnit.MILLISECONDS);
                        if (take == null) {
                            return;
                        }
                    }
                    hReady.release();
                    hReady.release();
                    oReady.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    public void hydrogen(Runnable releaseHydrogen) throws InterruptedException {
        hReady.acquire();
        // releaseHydrogen.run() outputs "H". Do not change or remove this line.
        releaseHydrogen.run();
        q.add(1);
    }

    public void oxygen(Runnable releaseOxygen) throws InterruptedException {
        oReady.acquire();
        // releaseOxygen.run() outputs "O". Do not change or remove this line.
        releaseOxygen.run();
        q.add(1);
    }
}

2.2. LEETCODE 1242. Web Crawler Multithreaded

https://zhang0peter.com/2020/02/12/LeetCode-1242-Web-Crawler-Multithreaded/
图的遍历问题,用多线程写的话就选择bfs,Consumer as Producer
任务依赖图为:

image.png
crawler和collector通信可以用通用套路(加个queue做pipeline),也可以用其他同步工具,毕竟简单。
代码:
public class Solution {

    private static final int    THREAD_COUNT    = 4;
    private static final String EMPTY           = "";
    private static final long   TIMEOUT_TO_POLL = 50;

    private final BlockingQueue<String> taskQueue = new LinkedBlockingQueue(1000);
    //worker thread
    private       ExecutorService       es        = new ThreadPoolExecutor(THREAD_COUNT, THREAD_COUNT,
            1000, TimeUnit.SECONDS, new LinkedBlockingQueue<>(THREAD_COUNT)
    );
    //private final        Queue<String>         result       = new ConcurrentLinkedQueue<>();
    private final Map<String, String>   visited   = new ConcurrentHashMap<>();
    private       String                hostName;

    public Solution() {
    }

    public List<String> crawl(String startUrl, HtmlParser htmlParser) {
        this.hostName = extractHostName(startUrl);
        taskQueue.add(startUrl);
        visited.put(startUrl, EMPTY);
        CountDownLatch closedLatch = new CountDownLatch(THREAD_COUNT);
        //    start consumer
        for (int i = 0; i < THREAD_COUNT; i++) {
            es.submit(() -> {
                while (true) {
                    try {
                        String url = taskQueue.poll(TIMEOUT_TO_POLL, TimeUnit.MILLISECONDS);
                        if (url == null) {
                            closedLatch.countDown();
                            return;
                        }
                        List<String> urls = htmlParser.getUrls(url);
                        if (urls == null || urls.isEmpty()) {
                            continue;
                        }
                        for (String s : urls) {
                            if (!visited.containsKey(s) && validHost(s)) {
                                taskQueue.add(s);
                                visited.put(s, EMPTY);
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        //wait
        try {
            closedLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //    convert result
        Set<String> keys = visited.keySet();
        return new ArrayList<>(keys);
    }

    private boolean validHost(String s) {
        return this.hostName.equals(extractHostName(s));
    }

    private String extractHostName(String startUrl) {
        int i = startUrl.indexOf('/', 7);
        return (i == -1) ? startUrl : startUrl.substring(0, i);
    }

    @Test
    public void test1() {
        HtmlParser parser = new HtmlParser();
        List<String> crawl = new Solution().crawl("http://news.yahoo.com/news/topics/", parser);
        System.out.println(crawl);
    }

    @Test
    public void test2() {
        HtmlParser parser = new HtmlParser();
        List<String> crawl = new Solution().crawl("http://news.google.com", parser);
        System.out.println(crawl);
    }
}

另一种方案是递归写DFS,有依赖顺序的任务不好直接往线程池里扔,要用ForkJoinPool:

 class Solution {

    private static final int          THREAD_COUNT    = 4;
    private static final String       EMPTY           = "";
    private static final long         TIMEOUT_TO_POLL = 50;
    private              ForkJoinPool pool            = new ForkJoinPool(THREAD_COUNT);

    private final Map<String, String> visited = new ConcurrentHashMap<>();
    private       String              hostName;

    public List<String> crawl(String startUrl, HtmlParser htmlParser) {
        this.hostName = extractHostName(startUrl);
        visited.put(startUrl, EMPTY);
        CrawlerTask task = new CrawlerTask(startUrl, htmlParser);
        pool.invoke(task);
        //    convert result
        Set<String> keys = visited.keySet();
        return new ArrayList<>(keys);
    }

    class CrawlerTask extends RecursiveAction {

        String     url;
        HtmlParser htmlParser;

        public CrawlerTask(String url, HtmlParser htmlParser) {
            this.url = url;
            this.htmlParser = htmlParser;
        }

        @Override
        protected void compute() {
            List<String> urls = htmlParser.getUrls(url);
            if (urls == null || urls.isEmpty()) {
                return;
            }
            List<CrawlerTask> subs = new ArrayList<>();
            for (int i = 0; i < urls.size(); i++) {
                String s = urls.get(i);
                if (!visited.containsKey(s) && validHost(s)) {
                    visited.put(s, EMPTY);
                    CrawlerTask subTask = new CrawlerTask(s, htmlParser);
                    if (i < urls.size() - 1) {
                        subTask.fork();
                        subs.add(subTask);
                    } else {
                        //visit last url
                        subTask.compute();
                    }
                }
            }
            for (CrawlerTask sub : subs) {
                sub.join();
            }
        }
    }

    private boolean validHost(String s) {
        return this.hostName.equals(extractHostName(s));
    }

    private String extractHostName(String startUrl) {
        int i = startUrl.indexOf('/', 7);
        return (i == -1) ? startUrl : startUrl.substring(0, i);
    }

    @Test
    public void test1() {
        HtmlParser parser = new HtmlParser();
        List<String> crawl = new Solution().crawl("http://news.yahoo.com/news/topics/", parser);
        System.out.println(crawl);
    }

    @Test
    public void test2() {
        HtmlParser parser = new HtmlParser();
        List<String> crawl = new Solution().crawl("http://news.google.com", parser);
        System.out.println(crawl);
    }
}
上一篇下一篇

猜你喜欢

热点阅读