十九 并发包内容

2018-11-26  本文已影响0人  BeYearn

都有些啥

  1. 信号量 Semaphore
    它通过控制一定数量的允许(permit)的方式,来达到限制通用资源访问的目的
public class UsualSemaphoreSample {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Action...GO!");
        Semaphore semaphore = new Semaphore(5);   //限定五个信号量
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(new SemaphoreWorker(semaphore));
            t.start();  //10个线程来跑
        }
    }
}
class SemaphoreWorker implements Runnable {
    private String name;
    private Semaphore semaphore;
    public SemaphoreWorker(Semaphore semaphore) {
        this.semaphore = semaphore;
}
    @Override
    public void run() {
        try {
            log("is waiting for a permit!");
                semaphore.acquire(); //获取信号量  获取不到阻塞
            log("acquired a permit!");
            log("executed!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            log("released a permit!");
            semaphore.release(); //释放一个信号量回去
        }
    }
    private void log(String msg){
        if (name == null) {
            name = Thread.currentThread().getName();}
        System.out.println(name + " " + msg);
    }
}

这样一个线程执行完了释放了信号量(许可), 立即就有阻塞的线程获得许可进入执行
倘若想一次放五个进入, 完了再放五个进入,可以如下(实际不会这么写, 仅演示):

public class AbnormalSemaphoreSample {
    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(0);
        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(new MyWorker(semaphore));
            t.start();
        }
        System.out.println("Action...GO!");  // 此时上面起的五个线程阻塞于信号量的获取处
        semaphore.release(5);  // 释放五个信号量(许可)
        System.out.println("Wait for permits off");
        while (semaphore.availablePermits()!=0) {   //查看可用信号量是否为0
            Thread.sleep(100L);
        }
        System.out.println("Action...GO again!");     // 为0的话就再放五个
        semaphore.release(5);
    }
}
class MyWorker implements Runnable {
    private Semaphore semaphore;
    public MyWorker(Semaphore semaphore) {
              this.semaphore = semaphore;
    }
    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println("Executed!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

总的来看, Semaphore就像一个计数器, 基本逻辑基于acquire和release,并没有太复杂的同步逻辑. 如果 Semaphore 的数值被初始化为 1,那么一个线程就可以通过 acquire 进入互斥状态,本质上和互斥锁是非常相似的。但是区别也非常明显,比如互斥锁是有持有者的,而对于 Semaphore 这种计数器结构,虽然有类似功能,但其实不存在真正意义的持有者,除非我们进行扩展包装。

  1. CountDownLatch 和 CyclicBarrier

用CountDownLatch来实现五个一次五个一次

public class LatchSample {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(6);
           for (int i = 0; i < 5; i++) {
                Thread t = new Thread(new FirstBatchWorker(latch));
                t.start();}
           for (int i = 0; i < 5; i++) {
                Thread t = new Thread(new SecondBatchWorker(latch));
                t.start();}
       // 注意这里也是演示目的的逻辑,并不是推荐的协调方式
        while ( latch.getCount() != 1 ){  //在第一个五个没执行完(没countdown)时一直等
                Thread.sleep(100L);
        }
        System.out.println("Wait for first batch finish");
        latch.countDown();   // 这里第6个countdown 第二个五个就可以开始了
    }
}
class FirstBatchWorker implements Runnable {
    private CountDownLatch latch;
    public FirstBatchWorker(CountDownLatch latch) {
        this.latch = latch;
    }
    @Override
    public void run() {
            System.out.println("First batch executed!");
            latch.countDown();
    }
}
class SecondBatchWorker implements Runnable {
    private CountDownLatch latch;
    public SecondBatchWorker(CountDownLatch latch) {
        this.latch = latch;}
    @Override
    public void run() {
        try {
            latch.await();
            System.out.println("Second batch executed!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上例也从侧面体现出了它的局限性,虽然它也能够支持 10 个人排队的情况,但是因为不能重用,如果要支持更多人排队,就不能依赖一个 CountDownLatch 进行了. 使用CyclicBarrier实现如下:

public class CyclicBarrierSample {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("Action...GO again!");
            }
        });
        for (int i = 0; i < 5; i++) {
            Thread t = new Thread(new CyclicWorker(barrier));
            t.start();
        }
    }
    static class CyclicWorker implements Runnable {
        private CyclicBarrier barrier;
        public CyclicWorker(CyclicBarrier barrier) {
            this.barrier = barrier;
        }
        @Override
        public void run() {
            try {
                for (int i=0; i<3 ; i++){
                    System.out.println("Executed!");
                    barrier.await();
                }
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
       }
    }
}

CyclicBarrier 其实反映的是线程并行运行时的协调
为了让输出更能表达运行时序,使用了 CyclicBarrier 特有的 barrierAction("Action...GO again!"),当屏障被触发时,Java 会自动调度该动作
输出如下


图片.png
  1. 并发包里提供的线程安全 Map、List 和 Set


    图片.png

    总体上种类和结构还是比较简单的,如果我们的应用侧重于 Map 放入或者获取的速度,而不在乎顺序,大多推荐使用 ConcurrentHashMap,反之则使用 ConcurrentSkipListMap

关于两个 CopyOnWrite 容器,其实 CopyOnWriteArraySet 是通过包装了 CopyOnWriteArrayList 来实现的,所以在学习时,我们可以专注于理解一种。

CopyOnWrite 到底是什么意思呢?它的原理是,任何修改操作,如 add、set、remove,都会拷贝原数组,修改后替换原来的数组,通过这种防御性的方式,实现另类的线程安全. 第三章中也有说明

public boolean add(E e) {
    synchronized (lock) {
        Object[] elements = getArray();
        int len = elements.length;
           // 拷贝
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
           // 替换
        setArray(newElements);
        return true;
            }
}
final void setArray(Object[] a) {
    array = a;
}
上一篇 下一篇

猜你喜欢

热点阅读