深耕 JDK 源码 - CyclicBarrier

2023-04-21  本文已影响0人  西瓜汁and柠檬水

CyclicBarrier是JDK8提供的一个多线程同步工具,用于实现一组线程在某个点上等待,直到所有线程都到达该点后再一起继续执行。它基于"栅栏"(Barrier)的概念,通过一个计数器来实现线程的同步。在这篇文章中,我们将介绍CyclicBarrier的实现原理、基础用法,并提供代码示例。

CyclicBarrier的实现原理

CyclicBarrier内部使用了ReentrantLock和Condition来实现线程的等待和唤醒机制。其基本原理如下:

  1. CyclicBarrier通过构造函数指定参与同步的线程数量,称为parties。当所有parties数量的线程都调用了await()方法后,CyclicBarrier会将计数器重置为初始值,并唤醒所有在等待的线程。
  2. 每个线程在调用await()方法时,会将自己加入等待队列,并释放持有的锁。
  3. 当最后一个线程调用了await()方法后,会唤醒所有在等待队列中的线程,使得它们可以继续执行。
  4. CyclicBarrier还可以设置一个可选的回调函数(Runnable),在所有线程都到达栅栏点后执行。

CyclicBarrier的基础用法

CyclicBarrier的基础用法包括以下几个步骤:

1.创建CyclicBarrier实例,并指定参与同步的线程数量。例如:

1CyclicBarrier barrier = new CyclicBarrier(3);

这里创建了一个CyclicBarrier实例,指定了参与同步的线程数量为3。

2.在每个线程中调用await()方法,使线程等待。例如:

1try {
2    System.out.println("Thread " + Thread.currentThread().getId() + " is waiting at the barrier.");
3    barrier.await(); // 等待所有线程到达栅栏点
4    System.out.println("Thread " + Thread.currentThread().getId() + " has passed the barrier.");
5} catch (InterruptedException | BrokenBarrierException e) {
6    e.printStackTrace();
7}

这里通过调用await()方法使线程等待,直到所有参与同步的线程都到达栅栏点。当所有线程都调用了await()方法后,会唤醒它们,使得它们可以继续执行。

3.可选的回调函数。在创建CyclicBarrier实例时,可以传入一个可选的回调函数(Runnable),在所有线程都到达栅栏点后执行。例如:

1CyclicBarrier barrier = new CyclicBarrier(3, () -> {
2    System.out.println("All threads have reached the barrier.");
3    // 可选的回调函数,会在所有线程到达栅栏点后执行
4});

这里定义了一个回调函数,当所有线程都到达栅栏点后会执行。

4.重置栅栏:

1// 可以使用reset()方法来重置栅栏,将计数器重新设置为初始值
2barrier.reset();

这样可以使得之前等待的线程重新进入等待状态,可以用于多次复用栅栏。

CyclicBarrier的代码示例

下面是一个简单的CyclicBarrier的代码示例,演示了三个线程在栅栏点处等待,并在所有线程都到达后执行回调函数:

 1import java.util.concurrent.BrokenBarrierException;
 2import java.util.concurrent.CyclicBarrier;
 3
 4public class CyclicBarrierExample {
 5
 6    public static void main(String[] args) {
 7        // 创建CyclicBarrier实例,指定参与同步的线程数量为3,并定义回调函数
 8        CyclicBarrier barrier = new CyclicBarrier(3, () -> {
 9            System.out.println("All threads have reached the barrier.");
10        });
11
12        // 创建并启动三个线程
13        Thread t1 = new Thread(new Worker(barrier));
14        Thread t2 = new Thread(new Worker(barrier));
15        Thread t3 = new Thread(new Worker(barrier));
16
17        t1.start();
18        t2.start();
19        t3.start();
20
21        try {
22            t1.join();
23            t2.join();
24            t3.join();
25        } catch (InterruptedException e) {
26            e.printStackTrace();
27        }
28    }
29
30    static class Worker implements Runnable {
31        private CyclicBarrier barrier;
32
33        public Worker(CyclicBarrier barrier) {
34            this.barrier = barrier;
35        }
36
37        @Override
38        public void run() {
39            try {
40                System.out.println("Thread " + Thread.currentThread().getId() + " is working.");
41                Thread.sleep(2000); // 模拟工作耗时
42                System.out.println("Thread " + Thread.currentThread().getId() + " is waiting at the barrier.");
43                barrier.await(); // 等待所有线程到达栅栏点
44                System.out.println("Thread " + Thread.currentThread().getId() + " has passed the barrier.");
45            } catch (InterruptedException | BrokenBarrierException e) {
46                e.printStackTrace();
47            }
48        }
49    }
50}

在上面的示例中,我们创建了一个包含3个线程的CyclicBarrier实例,并在每个线程中调用了await()方法使其等待。当所有线程都到达栅栏点后,会执行定义的回调函数并输出相应的信息。

总结:

CyclicBarrier是一种实现多线程同步的机制,通过栅栏点来等待多个线程到达,并在所有线程都到达后执行回调函数。它的使用可以帮助我们实现一些需要多线程协作的场景,如分布式计算、多线程数据处理等。在使用CyclicBarrier时,需要注意线程数量的设置、await()方法的调用和可选的回调函数的使用。希望这篇文章能够对你理解CyclicBarrier的实现原理和基础用法有所帮助。

上一篇下一篇

猜你喜欢

热点阅读