并发与高并发课程学习笔记(9)

2018-04-29  本文已影响0人  Ukuleler

Fork/Join框架介绍

1.Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。

2. 工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:


那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

3.Fork/Join框架设计思想

第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。

第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

Fork/Join使用两个类来完成以上两件事情:

ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

RecursiveAction:用于没有返回结果的任务。

RecursiveTask :用于有返回结果的任务。

ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

4.BlockingQueue

阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:

从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;

多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)

当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

阻塞队列的三个常用操作 : insert(插入); Remove(移除); Examine(检查);


Throws Exception:如果不能马上进行那么会抛出异常,调用add(o);remove(o);element();

Special Value:如果操作不能进行,那么需要返回特殊的值,调用offer(o);poll();peek();

Block:如果操作不能进行的时候,操作会被阻塞,调用put(o);take();

Times Out:如果擦做不能进行,操作会被阻塞,会被阻塞指定的时间;调用offer(o, timeout, timeunit); poll(timeout, timeunit)

1)关于BlockingQueue的成员类

ArrayBlockQueue

ArrayBlockQueue是一个阻塞式的队列,间接的实现了Queue接口和Collection接口。底层以数组的形式保存数据(实际上可看作一个循环数组)。ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockQueue容量有限,在初始化时需指定,不会自动扩容。

ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

LinkedBlockingQueue

LinkedBlockingQueue是基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

/**

* 生产者线程

*

*

*/

public class Producer implements Runnable {

    private volatile boolean      isRunning              = true;

    private BlockingQueue queue;

    private static AtomicInteger  count                  = new AtomicInteger();

    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;

    public Producer(BlockingQueue queue) {

        this.queue = queue;

    }

    public void run() {

        String data = null;

        Random r = new Random();

        System.out.println("启动生产者线程!");

        try {

            while (isRunning) {

                System.out.println("正在生产数据...");

                Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

                data = "data:" + count.incrementAndGet();

                System.out.println("将数据:" + data + "放入队列...");

                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {

                    System.out.println("放入数据失败:" + data);

                }

            }

        } catch (InterruptedException e) {

            e.printStackTrace();

            Thread.currentThread().interrupt();

        } finally {

            System.out.println("退出生产者线程!");

        }

    }

    public void stop() {

        isRunning = false;

    }

}

/**

* 消费者线程

*

*

*/

public class Consumer implements Runnable {

    private BlockingQueue queue;

    private static final int      DEFAULT_RANGE_FOR_SLEEP = 1000;

    public Consumer(BlockingQueue queue) {

        this.queue = queue;

    }

    public void run() {

        System.out.println("启动消费者线程!");

        Random r = new Random();

        boolean isRunning = true;

        try {

            while (isRunning) {

                System.out.println("正从队列获取数据...");

                String data = queue.poll(2, TimeUnit.SECONDS);

                if (null != data) {

                    System.out.println("拿到数据:" + data);

                    System.out.println("正在消费数据:" + data);

                    Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));

                } else {

                    // 超过2s还没数据,认为所有生产线程都已经退出,自动退出消费线程。

                    isRunning = false;

                }

            }

        } catch (InterruptedException e) {

            e.printStackTrace();

            Thread.currentThread().interrupt();

        } finally {

            System.out.println("退出消费者线程!");

        }

    }

}

/**

* 测试类

*/

public class BlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {

        // 声明一个容量为10的缓存队列

        BlockingQueue queue = new LinkedBlockingQueue(10);

        Producer producer1 = new Producer(queue);

        Producer producer2 = new Producer(queue);

        Producer producer3 = new Producer(queue);

        Consumer consumer = new Consumer(queue);

        // 借助Executors

        ExecutorService service = Executors.newCachedThreadPool();

        // 启动线程

        service.execute(producer1);

        service.execute(producer2);

        service.execute(producer3);

        service.execute(consumer);

        // 执行10s

        Thread.sleep(10 * 1000);

        producer1.stop();

        producer2.stop();

        producer3.stop();

        Thread.sleep(2000);

        // 退出Executor

        service.shutdown();

    }

}

DelayQueue

 DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

使用场景:

  DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

上一篇 下一篇

猜你喜欢

热点阅读