CountDownLatch、Semphore和CyclicBa

2021-03-03  本文已影响0人  handsomemao666

源码地址https://github.com/handsomemao/concurrency.git

CountDownLatch

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。CountDownLatch 是一种灵活的闭锁实现,它可以使一个或者多个线程等待一组事件的发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown 方法递减计数器,表示有一个事件已经发生,而 await 方法等待计数器达到零,这表示需要等待的事情都已经发生。如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。
CountDownLatch只能执行一次,计数值不能被重置,这个是CountDownLatch和CyclicBarrier的区别。

@Slf4j
public class CountDownLatchExample2 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await(10, TimeUnit.MILLISECONDS);
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
    }
}

Semphore

计数信号量(Counting Semaphore)用来控制同时访问某个特定资源的访问数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

Semaphore 中管理着一组虚拟的许可(permit),许可的初始容量可通过构造函数来指定。在执行操作时可以首先获得许可(只要还有剩余的许可),并在使用以后释放许可。如果没有许可,那么 acquire 将阻塞直到有许可(或者直到被中断或者操作超时)。release 方法将返回一个许可给信号量。二值信号量(即初始值为 1 的 Semaphore)可以用做互斥体(mutex),并具备不可重入的加锁语义:谁拥有这个唯一的许可,谁就拥有了互斥锁。因此,它可以用来 实现生产者-消费者模式

public class SemaphoreExample4 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }

CyclicBarrier

我们已经看到通过闭锁来启动一组相关的操作,或者等待一组相关的操作结束。闭锁是一次性对象,一旦进入终止状态,就不能被重置。

栅栏(Barrier)类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏和闭锁的关键区别在于,所有线程必须同时达到栅栏位置,才能继续执行。CyclicBarrier 可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用,有兴趣的可以看下 CyclicBarrier example: a parallel sort algorithm

当现成达到栅栏位置时将调用 await 方法这个方法将阻塞直到所有线程都达到栅栏位置。如果所有线程都达到了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置一遍下次使用。如果对 await 的调用超时,或者 await 阻塞的线程被中断,那么栅栏就被认为是被打破了,所有阻塞的 await 调用都将终止并抛出 BrokenBarrierException。如果成功通过栅栏,那么 await 将为每个线程返回一个唯一的到达索引号。CyclicBarrier 还可以使你将一个栅栏操作传递给构造函数,这是一个 Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不能执行的。

public class CyclicBarrierExample3 {

    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        log.info("callback is running");
    });

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

源码地址https://github.com/handsomemao/concurrency.git

上一篇下一篇

猜你喜欢

热点阅读