JAVA并发编程(四)并发工具类

2019-07-23  本文已影响0人  RyanLee_

致敬首先致敬 Doug Lea。java.util.concurrent 的贡献者。

1. Fork/Join框架

Fork/Join框架是Java7提供的一个用于并行计算的框架。
简单描述一下Fork/Join思想:在必要的情况下,将一个大任务,进行拆分(fork)成若干小任务。这些小任务再递归进行拆分,直到不可拆。然后将一个个小任务的运行结果进行汇总(join)。


fork join.png

Fork/Join框架体现了分而治之的思想。

1.1. Fork/Join 标准范式

image.png

2. 工作窃取(work-stealing)算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
一个大型任务可分割为若干个互相独立的子任务。为了减少线程间的竞争。把这些子任务分别放到不同的队列里,并未每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。
比如:线程A负责处理1队列里的任务,线程B负责2队列的任务。线程A先做完队列1中的任务,线程B尚未做完队列2中的任务。这时线程A会去线程B负责的队列2中窃取一个任务来做。
问题来了:线程A、B访问同一个队列,可能存在竞争任务的情况。
为了减少线程A、B之间的竞争。通常会使用双端队列,B永远从双端队列2的头部拿任务执行,而线程A永远从双端队列2的尾部拿任务执行。


work stealing.png

Fork/Join框架基于工作窃取算法,实现任务并行运算。

3.并发工具类

3.1. ForkJoinPool

栗子:统计目录下所有txt文本的行数

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
 * @author Ryan Lee 
 */
public class TestCountTxtLineNumsForkJoin {

    public static void main(String[] args) {
        try {
            //使用ForkJoinPool调度任务
            long start = System.currentTimeMillis();
            ForkJoinPool pool = new ForkJoinPool();
            CountTxtLineNumsTask task = new CountTxtLineNumsTask(new File("/Users/ryanlee/Library/Containers/"));
            System.out.println("使用fork-join pool多线程统计txt文件行数。开始统计..");
            //invoke 主线程同步等待任务完成。execute 主线程不等待fork-join pool中的。
            pool.invoke(task);
            System.out.println(String.format("统计完成,结果:%s,耗时:%s",task.join(),(System.currentTimeMillis() - start) + "ms"));
            pool.shutdown();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    static class CountTxtLineNumsTask extends RecursiveTask<Integer> {
        private File path;//当前任务需要搜寻的目录
        public CountTxtLineNumsTask(File path) {
            this.path = path;
        }
        @Override
        protected Integer compute() {
            List<CountTxtLineNumsTask> subTasks = new ArrayList<>();
            File[] files = path.listFiles();
            if (files != null) {
                int linenumber = 0;
                for (File file : files) {
                    //如果是目录
                    if (file.isDirectory()) {
                        CountTxtLineNumsTask task = new CountTxtLineNumsTask(file);
                        subTasks.add(task);
                    } else {
                        //遇到文件,检查
                        if (file.getAbsolutePath().endsWith("txt")) {
                            try {
                                FileReader fr = new FileReader(file);
                                LineNumberReader lnr = new LineNumberReader(fr);
                                while (lnr.readLine() != null) {
                                    linenumber++;
                                }
                                lnr.close();

                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
                invokeAll(subTasks);
                for (CountTxtLineNumsTask taskItem : subTasks) {
                    linenumber += taskItem.join();
                }
                return linenumber;
            }
            return 0;

        }
    }

}

执行结果:

使用fork-join pool多线程统计txt文件行数。开始统计..
统计完成,结果:2093904,耗时:2132ms
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
/**
 * @author Ryan Lee 
 */
public class TestCountTxtLineNumsSingleThread {

    private static int linenumber = 0;
    private File path;//当前任务需要搜寻的目录
    public TestCountTxtLineNumsSingleThread(File path) {
        this.path = path;
    }
    public static void main(String[] args) {
        try {
            long start = System.currentTimeMillis();
            System.out.println("单线程统计txt文件行数,开始统计...");
            compute(new File("/Users/ryanlee/Library/Containers/"));
            System.out.println(String.format("统计完成,结果:%s,耗时:%s", linenumber, (System.currentTimeMillis() - start) + "ms"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void compute(File path) {
        File[] files = path.listFiles();
        if (files != null) {
            for (File file : files) {
                //如果是目录
                if (file.isDirectory()) {
                    compute(file);
                } else {
                    //遇到文件,检查
                    if (file.getAbsolutePath().endsWith("txt")) {
                        try {
                            FileReader fr = new FileReader(file);
                            LineNumberReader lnr = new LineNumberReader(fr);
                            while (lnr.readLine() != null) {
                                linenumber++;
                            }
                            lnr.close();

                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

执行结果:

单线程统计txt文件行数,开始统计...
统计完成,结果:2093904,耗时:7857ms

结果分析:
使用ForkJoin Task并行统计运算能充分利用多核CPU。执行效率明显优于单线程统计运算

ForkJoin 同步用法、异步用法

submit、execute、invoke、invokeAll()

3.2. CountDownLatch

CountDownLatch在java1.5被引入。CountDownLatch是一个线程安全的计数器。它能够使一个线程等待其他线程完成各自的工作后再执行。

栗子:模拟并发远程接口调用

import java.util.concurrent.CountDownLatch;
/**
 * @author Ryan Lee 
 */
public class TestCountDownLatch {

    private static class CallInterfaceAThread implements Runnable {
        CountDownLatch countDownLatch;

        CallInterfaceAThread(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            System.out.println("Thread[" + Thread.currentThread().getName()
                    + "]调用接口A开始");
            try {
                //模拟接口调用耗时
                Thread.currentThread().sleep(1000);
                System.out.println("Thread[" + Thread.currentThread().getName()
                        + "]调用接口A成功");
                countDownLatch.countDown();

                //模拟后续处理,如:更新缓存
                Thread.currentThread().sleep(1);
                System.out.println("Thread[" + Thread.currentThread().getName()
                        + "]更新缓存");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch1 = new CountDownLatch(6);
        System.out.println("开始异步远程接口调用...");
        long start = System.currentTimeMillis();
        for (int i = 0; i < 6; i++) {
            Thread thread = new Thread(new CallInterfaceAThread(latch1));
            thread.start();
        }
        latch1.await();
        System.out.println("完成异步远程接口调用,耗时:" + (System.currentTimeMillis() - start) + "ms");
        Thread.currentThread().sleep(1);
        System.out.println("主线程开始做其它业务...");
        Thread.currentThread().sleep(1);
        System.out.println("主线程完成,用户得到返回结果");
    }
}

执行结果:

开始异步远程接口调用...
Thread-1调用接口A开始
Thread-0调用接口A开始
Thread-2调用接口A开始
Thread-3调用接口A开始
Thread-4调用接口A开始
Thread-5调用接口A开始
Thread-1调用接口A成功
Thread-4调用接口A成功
Thread-4更新缓存
Thread-3调用接口A成功
Thread-3更新缓存
Thread-2调用接口A成功
Thread-2更新缓存
Thread-5调用接口A成功
Thread-0调用接口A成功
Thread-5更新缓存
Thread-1更新缓存
完成异步远程接口调用,耗时:1002ms
主线程开始做其它业务...
Thread-0更新缓存
主线程完成,用户得到返回结果

结果分析:主线程开启6个子线程,然后执行CountDownLatch.await()进入阻塞状态。子线程调用完接口A,执行CountDownLatch.countDown(),然后执行后续的操作。子线程不会被阻塞。当CountDownLatch扣减到0时,主线程被唤醒,执行后续的操作。
在一些场景下,并发的方式可大大提高程序的执行效率。以批量远程接口调用为例分析:
1.Web容器基本都是支持并发容器。当一个用户请求进来,容器会启动一个新的线程响应用户请求,能够充分利用多核CPU。此外,当前Web应用基本都是集群部署。一定量级的并发调用不会对接口执行效率造成很大影响。 示例中并发调用接口A 6次所需时间接近于1S。而如果串行调用接口A 6次则需要多于6S的时间。

2.并发调用接口相对于串行调用可省却了一部分数据传输时间。调用局域网内部的接口差异不明显。试想下如果调用部署在3000公里以外服务器上的接口。一个往返需要0.02秒。抛开TCP/IP三次握手建立连接的耗时,串行6次调用接口至少 6*0.02 = 0.12秒 耗费在数据传输上;而并发调用接口的数据传输时间接近于0.02秒。

3.3. CyclicBarrier

CyclicBarrier也叫同步屏障,在JDK1.5被引入。应用于一组线程:先到达的线程被阻塞,直到最后一个线程到达屏障。然后执行完设定好的操作。执行完成后,所有被阻塞的线程才能继续执行。

栗子:模拟需要共同进行的集体活动

import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;

/**
 * @author Ryan Lee 
 */
public class TestCyclicBarrier {

    private static CyclicBarrier barrier
            = new CyclicBarrier(5, new CollectThread());

    private static ConcurrentHashMap<String, Long> resultMap
            = new ConcurrentHashMap<>();//存放子线程工作结果的容器

    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        for (int i = 0; i <= 4; i++) {
            Thread thread = new Thread(new SubThread());
            thread.start();
            System.out.println("主线程启动子线程:" + thread.getId());
        }
        System.out.println("主线程执行其它任务..." );
    }

    //负责屏障开放以后的工作
    private static class CollectThread implements Runnable {

        @Override
        public void run() {
            System.out.println("所有子线程一起通过屏障,集体活动开始");
            StringBuilder result = new StringBuilder();
            for (Map.Entry<String, Long> item : resultMap.entrySet()) {
                result.append("[" + item.getValue() + "]");
            }
            System.out.println("集体活动的结果 = " + result);
            System.out.println("集体活动结束");
        }
    }
    //工作线程
    private static class SubThread implements Runnable {
        @Override
        public void run() {
            long id = Thread.currentThread().getId();//线程本身的处理结果
            resultMap.put("" + id, id);
            Random r = new Random();//随机决定工作线程的是否睡眠
            try {
                //随机决定是否做准备工作
                if (r.nextBoolean()) {
                    System.out.println("线程" + id + "正在做准备工作...");
                    Thread.sleep(2000 + id);
                }
                System.out.println("线程" + id + "已经准备好做集体活动...");
                barrier.await();
                Thread.sleep(1000 + id);
                System.out.println("线程" + id + "处理个人事务");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:

主线程启动子线程:19
主线程启动子线程:20
主线程启动子线程:21
主线程启动子线程:22
主线程启动子线程:23
主线程执行其它任务...
线程19已经准备好做集体活动...
线程22正在做准备工作...
线程23已经准备好做集体活动...
线程21正在做准备工作...
线程20正在做准备工作...
线程20已经准备好做集体活动...
线程21已经准备好做集体活动...
线程22已经准备好做集体活动...
所有子线程一起通过屏障,集体活动开始
集体活动的结果 = [22][23][19][20][21]
集体活动结束
线程21处理个人事务
线程23处理个人事务
线程22处理个人事务
线程19处理个人事务
线程20处理个人事务

结果分析:
主线程不等待,启动完5个SubThread后,直接执行后续的操作。当所有SubThread都执行完CyclicBarrier.await(),达到屏障时,CollectThread启动,收集所有线程的ID。CollectThread执行完成后,5个SubThread执行各自后续的操作。

注意CyclicBarrier与CountDownLatch的差异。

3.4. Semaphore

Semaphore直译:信号量。

可以把信号量比喻成地下车库。例如:一个地下车库可容纳100辆车。如果还有车位就可以进去。如果车位满了,就要等有车出来释放一个车位,才能进去。
当一个车位被释放出来,等待的车辆默认随机进入车库获得这个车位(非公平)。当然也可以通过Semaphore的构造参数设定先到先得(公平)。

Semaphore一般用于限制对于某一资源的同时访问的线程数。

栗子:模拟多线程并发从数据库连接池获取连接

import java.sql.Array;
import java.sql.Blob;
import java.sql.CallableStatement;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.SQLClientInfoException;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;

public class SqlConnectImpl implements Connection{

    /**
     * 模拟获取数据库连接
     * @return 数据库连接
     */
        public static final Connection getConnection(){
               return new SqlConnectImpl();
       }
    @Override
    public <T> T unwrap(Class<T> iface) throws SQLException {
        // TODO Auto-generated method stub
        return null;
    }
    @Override
    public void setSchema(String schema) throws SQLException {
        // TODO Auto-generated method stub
        
    }
   
       ...此处省略一大堆Override方法。如需测试可用IDE自动补全...

    @Override
    public String getSchema() throws SQLException {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void abort(Executor executor) throws SQLException {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
        // TODO Auto-generated method stub
        
    }

    @Override
    public int getNetworkTimeout() throws SQLException {
        // TODO Auto-generated method stub
        return 0;
    }

}

import java.sql.Connection;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
/**
 * @author Ryan Lee 
 */
public class DBPoolUseSemaphore {
    private final int POOL_SIZE;
    //控制数据库连接的信号量
    private final Semaphore dbLinkSemaphore;
    //存放数据库连接的容器
    private LinkedList<Connection> pool = new LinkedList<Connection>();
    //初始化池
    public DBPoolUseSemaphore(int poolSize) {
        POOL_SIZE = poolSize;
        this.dbLinkSemaphore = new Semaphore(POOL_SIZE);
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.getConnection());
        }
    }
    /**
     * 释放连接
     * @param connection
     * @throws InterruptedException
     */
    public void releaseDbLink(Connection connection) throws InterruptedException {
        if (connection != null) {
            System.out.println("当前有" + dbLinkSemaphore.getQueueLength() + "个线程等待数据库连接!"
                    + "连接池可用连接数:" + dbLinkSemaphore.availablePermits());
            synchronized (pool) {
                pool.addLast(connection);
            }
            dbLinkSemaphore.release();
        }
    }
    /**
     * 获取连接
     * @return
     * @throws InterruptedException
     */
    public Connection getDbLink() throws InterruptedException {
        dbLinkSemaphore.acquire();
        Connection conn;
        synchronized (pool) {
            conn = pool.removeFirst();
        }
        return conn;
    }
}
import java.sql.Connection;
import java.util.Random;
/**
 * @author Ryan Lee 
 */
public class TestSemaphoreViaDBPool {
    private static DBPoolUseSemaphore dbPool = new DBPoolUseSemaphore(10);
    //查询数据库的业务线程
    private static class QueryDBThread extends Thread {
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            try {
                Connection connect = dbPool.getDbLink();
                System.out.println("Thread[" + Thread.currentThread().getId()
                        + "]获取数据库连接耗时 " + (System.currentTimeMillis() - start) + " ms.");
                //让每个线程持有连接的时间不一样
                Random r = new Random();
                //模拟业务操作,线程持有连接查询数据
                Thread.currentThread().sleep(100 + r.nextInt(100));
                System.out.println("查询数据完成,归还连接!");
                dbPool.releaseDbLink(connect);
            } catch (InterruptedException e) {
            }
        }
    }
    public static void main(String[] args) {
        //模拟50个线程并发从连接池获取连接
        for (int i = 0; i < 50; i++) {
            Thread thread = new QueryDBThread();
            thread.start();
        }
    }

}

运行结果:

Thread[20]获取数据库连接耗时 0 ms.
Thread[24]获取数据库连接耗时 0 ms.
Thread[23]获取数据库连接耗时 0 ms.
Thread[19]获取数据库连接耗时 0 ms.
Thread[21]获取数据库连接耗时 0 ms.
Thread[22]获取数据库连接耗时 0 ms.
Thread[26]获取数据库连接耗时 0 ms.
Thread[25]获取数据库连接耗时 0 ms.
Thread[27]获取数据库连接耗时 0 ms.
Thread[28]获取数据库连接耗时 0 ms.
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有40个线程等待数据库连接!连接池可用连接数:0
当前有40个线程等待数据库连接!连接池可用连接数:0
Thread[29]获取数据库连接耗时 119 ms.
Thread[30]获取数据库连接耗时 118 ms.
查询数据完成,归还连接!
当前有38个线程等待数据库连接!连接池可用连接数:0
Thread[31]获取数据库连接耗时 125 ms.
查询数据完成,归还连接!
当前有37个线程等待数据库连接!连接池可用连接数:0
Thread[32]获取数据库连接耗时 126 ms.
查询数据完成,归还连接!
当前有36个线程等待数据库连接!连接池可用连接数:0
查询数据完成,归还连接!
当前有35个线程等待数据库连接!连接池可用连接数:0
Thread[33]获取数据库连接耗时 142 ms.
Thread[34]获取数据库连接耗时 142 ms.
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有34个线程等待数据库连接!连接池可用连接数:0
当前有34个线程等待数据库连接!连接池可用连接数:0
Thread[35]获取数据库连接耗时 147 ms.
Thread[36]获取数据库连接耗时 147 ms.
查询数据完成,归还连接!
当前有32个线程等待数据库连接!连接池可用连接数:0
Thread[37]获取数据库连接耗时 152 ms.
查询数据完成,归还连接!
当前有31个线程等待数据库连接!连接池可用连接数:0
Thread[38]获取数据库连接耗时 156 ms.
查询数据完成,归还连接!
当前有30个线程等待数据库连接!连接池可用连接数:0
Thread[39]获取数据库连接耗时 247 ms.
查询数据完成,归还连接!
当前有29个线程等待数据库连接!连接池可用连接数:0
Thread[40]获取数据库连接耗时 259 ms.
查询数据完成,归还连接!
当前有28个线程等待数据库连接!连接池可用连接数:0
Thread[41]获取数据库连接耗时 270 ms.
查询数据完成,归还连接!
当前有27个线程等待数据库连接!连接池可用连接数:0
Thread[42]获取数据库连接耗时 284 ms.
查询数据完成,归还连接!
当前有26个线程等待数据库连接!连接池可用连接数:0
Thread[43]获取数据库连接耗时 285 ms.
查询数据完成,归还连接!
当前有25个线程等待数据库连接!连接池可用连接数:0
Thread[44]获取数据库连接耗时 291 ms.
查询数据完成,归还连接!
当前有24个线程等待数据库连接!连接池可用连接数:0
查询数据完成,归还连接!
当前有23个线程等待数据库连接!连接池可用连接数:0
Thread[45]获取数据库连接耗时 298 ms.
Thread[46]获取数据库连接耗时 298 ms.
查询数据完成,归还连接!
当前有22个线程等待数据库连接!连接池可用连接数:0
Thread[47]获取数据库连接耗时 302 ms.
查询数据完成,归还连接!
当前有21个线程等待数据库连接!连接池可用连接数:0
Thread[48]获取数据库连接耗时 304 ms.
查询数据完成,归还连接!
当前有20个线程等待数据库连接!连接池可用连接数:0
Thread[49]获取数据库连接耗时 388 ms.
查询数据完成,归还连接!
当前有19个线程等待数据库连接!连接池可用连接数:0
Thread[50]获取数据库连接耗时 395 ms.
查询数据完成,归还连接!
当前有18个线程等待数据库连接!连接池可用连接数:0
Thread[51]获取数据库连接耗时 405 ms.
查询数据完成,归还连接!
当前有17个线程等待数据库连接!连接池可用连接数:0
Thread[52]获取数据库连接耗时 411 ms.
查询数据完成,归还连接!
当前有16个线程等待数据库连接!连接池可用连接数:0
Thread[53]获取数据库连接耗时 420 ms.
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有15个线程等待数据库连接!连接池可用连接数:0
当前有15个线程等待数据库连接!连接池可用连接数:0
Thread[54]获取数据库连接耗时 425 ms.
Thread[55]获取数据库连接耗时 425 ms.
查询数据完成,归还连接!
当前有13个线程等待数据库连接!连接池可用连接数:0
Thread[56]获取数据库连接耗时 434 ms.
查询数据完成,归还连接!
当前有12个线程等待数据库连接!连接池可用连接数:0
Thread[57]获取数据库连接耗时 440 ms.
查询数据完成,归还连接!
当前有11个线程等待数据库连接!连接池可用连接数:0
Thread[58]获取数据库连接耗时 471 ms.
查询数据完成,归还连接!
当前有10个线程等待数据库连接!连接池可用连接数:0
Thread[59]获取数据库连接耗时 522 ms.
查询数据完成,归还连接!
当前有9个线程等待数据库连接!连接池可用连接数:0
Thread[60]获取数据库连接耗时 536 ms.
查询数据完成,归还连接!
当前有8个线程等待数据库连接!连接池可用连接数:0
Thread[61]获取数据库连接耗时 539 ms.
查询数据完成,归还连接!
当前有7个线程等待数据库连接!连接池可用连接数:0
Thread[62]获取数据库连接耗时 550 ms.
查询数据完成,归还连接!
当前有6个线程等待数据库连接!连接池可用连接数:0
Thread[63]获取数据库连接耗时 559 ms.
查询数据完成,归还连接!
当前有5个线程等待数据库连接!连接池可用连接数:0
Thread[64]获取数据库连接耗时 561 ms.
查询数据完成,归还连接!
当前有4个线程等待数据库连接!连接池可用连接数:0
Thread[65]获取数据库连接耗时 579 ms.
查询数据完成,归还连接!
当前有3个线程等待数据库连接!连接池可用连接数:0
Thread[66]获取数据库连接耗时 610 ms.
查询数据完成,归还连接!
当前有2个线程等待数据库连接!连接池可用连接数:0
Thread[67]获取数据库连接耗时 610 ms.
查询数据完成,归还连接!
当前有1个线程等待数据库连接!连接池可用连接数:0
Thread[68]获取数据库连接耗时 645 ms.
查询数据完成,归还连接!
当前有0个线程等待数据库连接!连接池可用连接数:0
查询数据完成,归还连接!
当前有0个线程等待数据库连接!连接池可用连接数:1
查询数据完成,归还连接!
当前有0个线程等待数据库连接!连接池可用连接数:2
查询数据完成,归还连接!
当前有0个线程等待数据库连接!连接池可用连接数:3
查询数据完成,归还连接!
当前有0个线程等待数据库连接!连接池可用连接数:4
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有0个线程等待数据库连接!连接池可用连接数:5
当前有0个线程等待数据库连接!连接池可用连接数:5
查询数据完成,归还连接!
当前有0个线程等待数据库连接!连接池可用连接数:7
查询数据完成,归还连接!
当前有0个线程等待数据库连接!连接池可用连接数:8
查询数据完成,归还连接!
当前有0个线程等待数据库连接!连接池可用连接数:9

一不小心点进了Semaphore的源码。顺手拆开看下。

3.5. Semaphore 源码

package java.util.concurrent;
import java.util.Collection;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/* *
 * 注释太长。此处省略。下面方法中的注释也经过删减。
 *
 * @since 1.5
 * @author Doug Lea
 */
public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    /** All mechanics via AbstractQueuedSynchronizer subclass */
    private final Sync sync;

    /**
     * Synchronization implementation for semaphore.  Uses AQS state
     * to represent permits. Subclassed into fair and nonfair
     * versions.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and nonfair fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * Creates a {@code Semaphore} with the given number of
     * permits and the given fairness setting.
     *
     * @param permits the initial number of permits available.
     *        This value may be negative, in which case releases
     *        must occur before any acquires will be granted.
     * @param fair {@code true} if this semaphore will guarantee
     *        first-in first-out granting of permits under contention,
     *        else {@code false}
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * Acquires a permit from this semaphore, blocking until one is
     * available, or the thread is {@linkplain Thread#interrupt interrupted}.
     * @throws InterruptedException if the current thread is interrupted
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * Acquires a permit from this semaphore, blocking until one is
     * available.
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /**
     * Acquires a permit from this semaphore, only if one is available at the
     * time of invocation.
     *
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }
    /**
     * Acquires a permit from this semaphore, if one becomes available
     * within the given waiting time and the current thread has not
     * been {@linkplain Thread#interrupt interrupted}.
     */
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Releases a permit, returning it to the semaphore.
     *
     */
    public void release() {
        sync.releaseShared(1);
    }

    /**
     * Acquires the given number of permits from this semaphore,
     * blocking until all are available,
     * or the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * @param permits the number of permits to acquire
     * @throws InterruptedException if the current thread is interrupted
     * @throws IllegalArgumentException if {@code permits} is negative
     */
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /**
     * Acquires the given number of permits from this semaphore,
     * blocking until all are available.
     * @param permits the number of permits to acquire
     * @throws IllegalArgumentException if {@code permits} is negative
     */
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /**
     * Acquires the given number of permits from this semaphore, only
     * if all are available at the time of invocation.
     * @param permits the number of permits to acquire
     * @return {@code true} if the permits were acquired and
     *         {@code false} otherwise
     * @throws IllegalArgumentException if {@code permits} is negative
     */
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    /**
     * Acquires the given number of permits from this semaphore, if all
     * become available within the given waiting time and the current
     * thread has not been {@linkplain Thread#interrupt interrupted}.
     *
     * @param permits the number of permits to acquire
     * @param timeout the maximum time to wait for the permits
     * @param unit the time unit of the {@code timeout} argument
     * @return {@code true} if all permits were acquired and {@code false}
     *         if the waiting time elapsed before all permits were acquired
     * @throws InterruptedException if the current thread is interrupted
     * @throws IllegalArgumentException if {@code permits} is negative
     */
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    /**
     * Releases the given number of permits, returning them to the semaphore.
     * @param permits the number of permits to release
     * @throws IllegalArgumentException if {@code permits} is negative
     */
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    /**
     * Returns the current number of permits available in this semaphore.
     * @return the number of permits available in this semaphore
     */
    public int availablePermits() {
        return sync.getPermits();
    }

    /**
     * Acquires and returns all permits that are immediately available.
     * @return the number of permits acquired
     */
    public int drainPermits() {
        return sync.drainPermits();
    }

    /**
     * Shrinks the number of available permits by the indicated
     * reduction. This method can be useful in subclasses that use
     * semaphores to track resources that become unavailable. This
     * method differs from {@code acquire} in that it does not block
     * waiting for permits to become available.
     *
     * @param reduction the number of permits to remove
     * @throws IllegalArgumentException if {@code reduction} is negative
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    /**
     * Returns {@code true} if this semaphore has fairness set true.
     * @return {@code true} if this semaphore has fairness set true
     */
    public boolean isFair() {
        return sync instanceof FairSync;
    }

    /**
     * Queries whether any threads are waiting to acquire. Note that
     * because cancellations may occur at any time, a {@code true}
     * return does not guarantee that any other thread will ever
     * acquire.  This method is designed primarily for use in
     * monitoring of the system state.
     *
     * @return {@code true} if there may be other threads waiting to
     *         acquire the lock
     */
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    /**
     * Returns an estimate of the number of threads waiting to acquire.
     * The value is only an estimate because the number of threads may
     * change dynamically while this method traverses internal data
     * structures.  This method is designed for use in monitoring of the
     * system state, not for synchronization control.
     *
     * @return the estimated number of threads waiting for this lock
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    /**
     * Returns a collection containing threads that may be waiting to acquire.
     * Because the actual set of threads may change dynamically while
     * constructing this result, the returned collection is only a best-effort
     * estimate.  The elements of the returned collection are in no particular
     * order.  This method is designed to facilitate construction of
     * subclasses that provide more extensive monitoring facilities.
     *
     * @return the collection of threads
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    /**
     * Returns a string identifying this semaphore, as well as its state.
     * The state, in brackets, includes the String {@code "Permits ="}
     * followed by the number of permits.
     *
     * @return a string identifying this semaphore, as well as its state
     */
    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }
}

源码简析:
可以看到 Semaphore声明了一个继承自AQS抽象类Sync,基于Sync声明实现了一个公平同步器FairSync和非公平同步器NonfairSync。基于构造参数决定成员属性Sync sync使用NonfairSync还是FairSync构建,默认使用NonfairSync。
声明acquire、tryAcquire、release...等等方法将sync的方法暴露出来。这些方法基本都继承自AQS。关于AQS这里不赘述。后面的文章会详细介绍。AQS和CAS是java.util.concurrent并发工具包的基础。

3.5. Exchanger

Exchanger是自jdk1.5起开始提供的工具套件。一般用于两个工作线程之间交换数据。应用场景较少。

栗子:模拟两个线程交换成员数组进行打印

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Exchanger;
/**
 * @author Ryan Lee 
 */
public class TestExchanger {
    private static final Exchanger<Set<String>> exchanger = new Exchanger<Set<String>>();

    public static void main(String[] args) {
        //第一个线程
        new Thread(() -> {
            Set<String> setA = new HashSet<String>();//存放数据的容器
            try {
                setA.add("item_1_in_setA");
                setA.add("item_2_in_setA");
                setA.add("item_3_in_setA");
                long start = System.currentTimeMillis();
                setA = exchanger.exchange(setA);//交换set
                System.out.println("ThreadA等待交换耗时:" + (System.currentTimeMillis() - start) + " ms.");
                for (String item : setA) {
                    System.out.println("ThreadA" + "打印:" + item);
                }
                /*处理交换后的数据*/
            } catch (InterruptedException e) {
            }
        }).start();
        
        //第二个线程
        new Thread(() -> {
            Set<String> setB = new HashSet<String>();//存放数据的容器
            try {
                setB.add("item_1_in_setB");
                setB.add("item_2_in_setB");
                setB.add("item_3_in_setB");
                Thread.currentThread().sleep(1000);
                long start = System.currentTimeMillis();
                setB = exchanger.exchange(setB);//交换set
                System.out.println("ThreadB等待交换耗时:" + (System.currentTimeMillis() - start) + " ms.");
                for (String item : setB) {
                    System.out.println("ThreadB" + "打印:" + item);
                }
                /*处理交换后的数据*/
            } catch (InterruptedException e) {
            }
        }).start();

    }
}

执行结果:

ThreadA等待交换耗时:1004 ms.
ThreadB等待交换耗时:0 ms.
ThreadB打印:item_3_in_setA
ThreadB打印:item_1_in_setA
ThreadB打印:item_2_in_setA
ThreadA打印:item_3_in_setB
ThreadA打印:item_1_in_setB
ThreadA打印:item_2_in_setB
上一篇下一篇

猜你喜欢

热点阅读