并发工具类

2019-03-22  本文已影响0人  shen33

CountDownLatch

专业术语:在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
大白话:就是有N个人投票(线程),统计员等待着所有人投完票的过程。
注意:线程执行完毕后,调用countDown(),该线程是执行完毕的。
使用:

int size = 10; //定义十个人投票
CountDownLatch latch = new CountDownLatch(size); //定义10个容量计数器
for (int i = 0; i < size; i++) {
    // 模拟投票
    new Thread(() -> {
        System.out.println(Thread.currentThread().getName() + "  投票...");
        latch.countDown(); //投完票计数器减一
    }).start();
}
latch.await();//等待计数器为0 才进行下面的任务
System.out.println("所有人投票完毕,统计票数...");

CountDownLatch的api:

  1. CountDownLatch.await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
  2. CountDownLatch.countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
  3. CountDownLatch.getCount() 返回当前计数。
  4. CountDownLatch.toString() 返回标识此锁存器及其状态的字符串。状态用括号括起来,包括字符串 "Count =",后跟当前计数。

CyclicBarrier

专业术语:它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。
大白话:就是队员集合(多个线程),已经集合的人等待剩下的人集合完毕后,队长开始统计人数的过程。
注意:这个过程中的线程是出于等待状态的。

import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * 部队集合-> 统计人数 -> 解散
 */
public class Demo {
    Random random = new Random();
    public void meet(CyclicBarrier cyclicBarrier){
        try {
            TimeUnit.SECONDS.sleep(random.nextInt(4)); // 随机休息4s的时间
            System.out.println(Thread.currentThread().getName() + " 集合...."); //模拟队员集合
            cyclicBarrier.await(); //等待其他队员集合
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 解散");
    }
    public static void main(String[] args) throws Exception {
        Demo d = new Demo();
        int size = 10; //定义队员数量
        //定义屏障数量,并达到数量之后完成对应的操作
        CyclicBarrier barrier = new CyclicBarrier(size, () -> System.out.println("开始报数...."));
        for (int i = 0; i < size; i++) {
            new Thread(() -> d.meet(barrier)).start();
        }
    }
}

CyclicBarrier的api:

  1. CyclicBarrier.await() 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
  2. CyclicBarrier.getNumberWaiting() 返回当前在屏障处等待的参与者数目。
  3. CyclicBarrier.getParties() 返回要求启动此 barrier 的参与者数目。
  4. CyclicBarrier.isBroken() 查询此屏障是否处于损坏状态。
  5. CyclicBarrier.reset() 将屏障重置为其初始状态。

Semaphore

专业术语:从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。
通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。
大白话:类似世博会,中国会场里面限定100个人同时参观,出来一个人,在放进去一个。

/**
 * 模拟参观世博会,并限制人流
 */
public class Demo {
    public void visit(Semaphore semaphore) {
        try {
            semaphore.acquire(); // 获取信号量
            System.out.println(Thread.currentThread().getName() + " 参观会场");
            semaphore.release(); // 释放信号量
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args)  {
        // 创建一个容量5的公平的信号量
        Semaphore semaphore = new Semaphore(5, true);
        Demo demo = new Demo();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> demo.visit(semaphore));
        }
    }
}

Semaphore的api:

  1. Semaphore.acquire() 获取信号量
  2. Semaphore.release() 释放信号量

Exchanger

专业术语:对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。

用途:可以用于两个线程某个对象的比对、或者对象的传递

public class Demo {
    public void a(Exchanger<String> exch) {
        System.out.println("a 方法执行...");
        try {
            System.out.println("a 线程正在抓取数据...");
            Thread.sleep(2000);
            System.out.println("a 线程抓取到数据...");
            String res = "12345";
            System.out.println("a 等待对比结果...");
            exch.exchange(res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
    public void b(Exchanger<String> exch) {
        System.out.println("b 方法开始执行...");
        try {
            System.out.println("b 方法开始抓取数据...");
            Thread.sleep(4000);
            System.out.println("b 方法抓取数据结束...");
            String res = "12345";
            String value = exch.exchange(res);
            System.out.println("开始进行比对...");
            System.out.println("比对结果为:" + value.equals(res));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        Demo d = new Demo();
        Exchanger<String> exch = new Exchanger<>();
        new Thread(() -> d.a(exch)).start();
        new Thread(() -> d.b(exch)).start();
    }
}

Exchanger的api:

  1. Exchanger.exchange() 等待另一个线程到达此交换点(除非当前线程被 中断),然后将给定的对象传送给该线程,并接收该线程的对象。
    如果另一个线程已经在交换点等待,则出于线程调度目的,继续执行此线程,并接收当前线程传入的对象。当前线程立即返回,接收其他线程传递的交换对象。
上一篇下一篇

猜你喜欢

热点阅读