Java 杂谈我爱编程

Java 并发:多线程锁计数器

2018-04-16  本文已影响27人  凌云_00

  在公司一个数据导入的场景中因为需要导入的数据量非常大,在本地导入一次需要十几分钟,估算线上导入的时间会翻倍,为了缩短导入时间,需要使用并发,但是导入完成后需要给用户反馈,而反馈代码又写在主线程中,所以就需要,在线程启动后,主线程挂起,在所有线程完成后,主线程恢复执行。
  问题提出后,心中有了个大致方案,并且在测试后能够顺利执行,但是在TestCase完成后,发现JDK 1.5中就有相似的场景解决方案,所以学习研究了一番,再次记录,并分享给大家

原有方案

  原有的解决方案是利用以下这几个包来实现的,通过Condition锁和while(true)的等待通知模式,实现了主线程的挂起和恢复

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

主线程 挂起机制

  主线程通过while(true) 和 index计数 实现了不停的挂起的操作

        lock.lock();

        int index = 0;

        while (true) {
            
            // threaNum 线程数量
            if (index < threadNum) {
                condition.await();
            } else {
                break;
            }
            
            index++;
        }

        lock.unlock();

线程 唤醒机制

  在执行的线程最后执行唤醒机制,唤醒主线程,

        lock.lock();

        condition.signalAll();

        lock.unlock();

  通过线程的唤醒,和主线程的挂起操作,主线程不停被唤醒,然后再次挂起,直到最后一个线程唤醒主线程,index = threadNum 主线程跳出循环,继续执行

JDK 原生方案

  JDK 1.5中 提供了CountDownLatch这个并发工具类,解决了多线程并发,主线程等待最后执行的效果。

构造器

// count 为 线程数量
public CountDownLatch(int count) 

常用方法

// //调用此方法的线程会被挂起,直到count值为0才继续执行
public void await();

// 在等待timeOut时间后,如果count值还没到0 立即执行当前线程
public boolean await(long timeout, TimeUnit unit)

// count减一
public void countDown()

案例

public class TestCase {

     public static void main(String[] args) {  
         
         // 创建对象,并声明有2个线程需要执行
         final CountDownLatch countDownLatch= new CountDownLatch(2);
 
         new Thread(){

             public void run() {

                 try {
                     System.out.println("我是小弟1:"+Thread.currentThread().getName()+"正在执行");
                    Thread.sleep(3000);
                    System.out.println("我是小弟1:"+Thread.currentThread().getName()+"执行完毕");
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
 
         new Thread(){

             public void run() {

                 try {
                     System.out.println("我是小弟2:"+Thread.currentThread().getName()+"正在执行");
                     Thread.sleep(3000);
                     System.out.println("我是小弟2:"+Thread.currentThread().getName()+"执行完毕");
                     countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             };
         }.start();
 
         try {

            System.out.println("等待2小弟干活呢...");

            countDownLatch.await();

            System.out.println("2个小弟干完了");

            System.out.println("老大我要继续干活了");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

     }
}
执行结果
我是小弟1:Thread-0正在执行
我是小弟2:Thread-1正在执行

等待2小弟干活呢...

我是小弟1:Thread-0执行完毕
我是小弟2:Thread-1执行完毕

2个小弟干完了

老大我要继续干活了

  大家可以发现 使用CountDownLatch 类来解决问题,更简洁和方便,不需要在写额外的循环和锁机制

concurrent中其他有趣和实用的工具类

CyclicBarrier

  CyclicBarrier翻译回环栅栏,实现的功能是让多个线程运行到某一标志点后挂起,当所有线程都到此标志位后再一起运行,类似起跑线,当所有人都各就位后才能唤醒,开始奔跑。

应用场景 - 多个线程做任务,等到达集合点同步后交给后面的线程做汇总

构造器

// count 指明多少个线程要到达特定标志
public CyclicBarrier(int count) {}

// barrierAction为当所有线程到特定标志位后执行的内容
public CyclicBarrier(int count, Runnable barrierAction) {}

方法

// 挂起当前线程,知道所有线程到达标志位,在执行
public int await() ;

// 挂起timeOut时间,如果所有线程还未到位,则到位的线程直接继续执行
public int await(long timeout, TimeUnit unit);

案例1-指定线程数

public class Test {

    public static void main(String[] args) {

        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);

        for(int i=0;i<N;i++) new Writer(barrier).start();

    }

    static class Writer extends Thread{

        private CyclicBarrier cyclicBarrier;

        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {

            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");

            try {

                Thread.sleep(5000);      //以睡眠来模拟写入数据操作

                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");

                cyclicBarrier.await();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");

        }
    }
}

运行结果

线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
案例2-指定执行内容
public class Test {

    public static void main(String[] args) {

        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N,new Runnable() {

            @Override
            public void run() {
                System.out.println("当前线程"+Thread.currentThread().getName());   
            }

        });
 
        for(int i=0;i<N;i++) new Writer(barrier).start();
    }

    static class Writer extends Thread{

        private CyclicBarrier cyclicBarrier;

        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {

            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");

            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作

                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");

                cyclicBarrier.await();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }

            System.out.println("所有线程写入完毕,继续处理其他任务...");

        }
    }
}

运行结果

线程Thread-0正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2正在写入数据...
线程Thread-3正在写入数据...

线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕

当前线程Thread-3

所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...

  从结果可以看出,当四个线程都到达barrier状态后,会从四个线程中选择一个线程去执行Runnable。

案例3-CyclicBarrier重用

public class Test {

    public static void main(String[] args) {

        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
 
        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }
 
        try {
            Thread.sleep(25000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
        System.out.println("CyclicBarrier重用");
 
        for(int i=0;i<N;i++) {
            new Writer(barrier).start();
        }

    }

    static class Writer extends Thread{

        private CyclicBarrier cyclicBarrier;

        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }
 
        @Override
        public void run() {

            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");

            try {

                Thread.sleep(5000);      //以睡眠来模拟写入数据操作

                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
 
                cyclicBarrier.await();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName()+"所有线程写入完毕,继续处理其他任务...");

        }
    }
}

运行结果

线程Thread-0正在写入数据...
线程Thread-1正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...

线程Thread-1写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕

Thread-0所有线程写入完毕,继续处理其他任务...
Thread-3所有线程写入完毕,继续处理其他任务...
Thread-1所有线程写入完毕,继续处理其他任务...
Thread-2所有线程写入完毕,继续处理其他任务...

CyclicBarrier重用

线程Thread-4正在写入数据...
线程Thread-5正在写入数据...
线程Thread-6正在写入数据...
线程Thread-7正在写入数据...

线程Thread-7写入数据完毕,等待其他线程写入完毕
线程Thread-5写入数据完毕,等待其他线程写入完毕
线程Thread-6写入数据完毕,等待其他线程写入完毕
线程Thread-4写入数据完毕,等待其他线程写入完毕

Thread-4所有线程写入完毕,继续处理其他任务...
Thread-5所有线程写入完毕,继续处理其他任务...
Thread-6所有线程写入完毕,继续处理其他任务...
Thread-7所有线程写入完毕,继续处理其他任务...

  从执行结果可以看出,在初次的4个线程越过barrier状态后,又可以用来进行新一轮的使用。而CountDownLatch无法进行重复使用。

Semaphore

  Semaphore翻译成字面意思为 信号量,信号量就是可以声明多把锁(包括一把锁:此时为互斥信号量)。
  举个例子:一个房间如果只能容纳5个人,多出来的人必须在门外面等着。如何去做呢?一个解决办法就是:房间外面挂着五把钥匙,每进去一个人就取走一把钥匙,没有钥匙的不能进入该房间而是在外面等待。每出来一个人就把钥匙放回原处以方便别人再次进入。

应用场景 - 流量控制,即控制能够访问的最大线程数。

构造器

//参数permits表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {}

//这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可,默认是非公平的
public Semaphore(int permits, boolean fair) {}

常用方法

//获取一个许可,若无许可能够获得,则会一直等待,直到获得许可
public void acquire();

//获取x个许可
public void acquire(int x);

//释放一个许可,注意,在释放许可之前,必须先获获得许可。
public void release() ; 

//释放x个许可
public void release(int x) ;  

以上4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法
//尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire()   

//尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(long timeout, TimeUnit unit)

//尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false
public boolean tryAcquire(int permits) 

//尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) 

  另外还可以通过availablePermits()方法得到可用的许可数目。

案例

  假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现

public class Test {

    public static void main(String[] args) {

        int N = 8;            //工人数
        Semaphore semaphore = new Semaphore(5); //机器数目

        for(int i=0;i<N;i++) new Worker(i,semaphore).start();
    }
 
    static class Worker extends Thread{

        private int num;
        private Semaphore semaphore;

        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }
 
        @Override
        public void run() {

            try {

                semaphore.acquire();

                System.out.println("工人"+this.num+"占用一个机器在生产...");

                Thread.sleep(2000);

                System.out.println("工人"+this.num+"释放出机器");

                semaphore.release();       
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

运行结果

工人0占用一个机器在生产...
工人1占用一个机器在生产...
工人2占用一个机器在生产...
工人4占用一个机器在生产...
工人5占用一个机器在生产...

工人0释放出机器
工人2释放出机器

工人3占用一个机器在生产...
工人7占用一个机器在生产...

工人4释放出机器
工人5释放出机器
工人1释放出机器

工人6占用一个机器在生产...

工人3释放出机器
工人7释放出机器
工人6释放出机器

1. CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
  - CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
  - 而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
  - CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
2. Semaphore和锁类似,一般用于控制对某组资源的访问权限。

Phaser

  Phaser是更加复杂和强大的同步辅助类。它允许并发执行多阶段任务。当我们有并发任务并且需要分解成几步执行时(CyclicBarrier是分成两步)就可以选择使用Phaser。
  Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才允许执行下一步。
  跟其他同步工具一样,必须对Phaser类中参与同步操作的任务数进行初始化,不同的是,可以动态的增加或者减少任务数。

Phaser状态

  -  活跃态:当存在参与同步的线程的时候,Phaser就是活跃的,并且在每个阶段结束的时候进行同步。
  -  终止态:当所有的线程都取消注册的时候,Phaser就处于终止态,此时Phaser没有任何参与者。

常用方法

// 类似于CyclicBarrier的await()方法,等待其它线程都到来之后同步继续执行
arriveAndAwaitAdvance()

// 把执行到此的线程从Phaser中注销掉
arriveAndDeregister()

// 判断Phaser是否终止
isTerminated()

// 将一个新的参与者注册到Phaser中,这个新的参与者将被当成没有执行完本阶段的线程
register()

// 强制Phaser进入终止态
forceTermination()

案例

  使用Phaser类同步三个并发任务。这三个任务将在三个不同的文件夹及其子文件夹中查找过去24小时内修改过扩展为为.log的文件。这个任务分成以下三个步骤:
1. 在执行的文件夹及其子文件夹中获取扩展名为.log的文件
2. 对每一步的结果进行过滤,删除修改时间超过24小时的文件
3. 将结果打印到控制台

  在第一步和第二步结束的时候,都会检查所查找到的结果列表是不是有元素存在。如果结果列表是空的,对应的线程将结束执行,并从Phaser中删除。(也就是动态减少任务数)

文件查找类
public class FileSearch implements Runnable {
    private String initPath;

    private String end;
    
    private List<String> results;

    private Phaser phaser;

    public FileSearch(String initPath, String end, Phaser phaser) {
        this.initPath = initPath;
        this.end = end;
        this.phaser=phaser;
        results=new ArrayList<>();
    }
    @Override
    public void run() {

        phaser.arriveAndAwaitAdvance();//等待所有的线程创建完成,确保在进行文件查找的时候所有的线程都已经创建完成了
        
        System.out.printf("%s: Starting.\n",Thread.currentThread().getName());
        
        // 1st Phase: 查找文件
        File file = new File(initPath);
        if (file.isDirectory()) {
            directoryProcess(file);
        }
        
        // 如果查找结果为false,那么就把该线程从Phaser中移除掉并且结束该线程的运行
        if (!checkResults()){
            return;
        }
        
        // 2nd Phase: 过滤结果,过滤出符合条件的(一天内的)结果集
        filterResults();
        
        // 如果过滤结果集结果是空的,那么把该线程从Phaser中移除,不让它进入下一阶段的执行
        if (!checkResults()){
            return;
        }
        
        // 3rd Phase: 显示结果
        showInfo();
        phaser.arriveAndDeregister();//任务完成,注销掉所有的线程
        System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
    }
    private void showInfo() {
        for (int i=0; i<results.size(); i++){
            File file=new File(results.get(i));
            System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());
        }
        // Waits for the end of all the FileSearch threads that are registered in the phaser
        phaser.arriveAndAwaitAdvance();
    }
    private boolean checkResults() {
        if (results.isEmpty()) {
            System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase());
            System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase());
            //结果为空,Phaser完成并把该线程从Phaser中移除掉
            phaser.arriveAndDeregister();
            return false;
        } else {
            // 等待所有线程查找完成
            System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());
            phaser.arriveAndAwaitAdvance();
            return true;
        }        
    }
    private void filterResults() {
        List<String> newResults=new ArrayList<>();
        long actualDate=new Date().getTime();
        for (int i=0; i<results.size(); i++){
            File file=new File(results.get(i));
            long fileDate=file.lastModified();
            
            if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){
                newResults.add(results.get(i));
            }
        }
        results=newResults;
    }
    private void directoryProcess(File file) {
        // Get the content of the directory
        File list[] = file.listFiles();
        if (list != null) {
            for (int i = 0; i < list.length; i++) {
                if (list[i].isDirectory()) {
                    // If is a directory, process it
                    directoryProcess(list[i]);
                } else {
                    // If is a file, process it
                    fileProcess(list[i]);
                }
            }
        }
    }
    private void fileProcess(File file) {
        if (file.getName().endsWith(end)) {
            results.add(file.getAbsolutePath());
        }
    }
}
主函数
public static void main(String[] args) {
        Phaser phaser = new Phaser(3);

        FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
        FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
        FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);

        Thread systemThread = new Thread(system, "System");
        systemThread.start();
        Thread appsThread = new Thread(apps, "Apps");
        appsThread.start();        
        Thread documentsThread = new Thread(documents, "Documents");
        documentsThread.start();
        try {
            systemThread.join();
            appsThread.join();
            documentsThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("Terminated: %s\n", phaser.isTerminated());
    }

  例子中Phaser分了三个步骤:查找文件、过滤文件、打印结果。并且在查找文件和过滤文件结束后对结果进行分析,如果是空的,将此线程从Phaser中注销掉。也就是说,下一阶段,该线程将不参与运行。

  在run()方法中,开头调用了phaser的arriveAndAwaitAdvance()方法来保证所有线程都启动了之后再开始查找文件。在查找文件和过滤文件阶段结束之后,都对结果进行了处理。即:如果结果是空的,那么就把该条线程移除,如果不空,那么等待该阶段所有线程都执行完该步骤之后在统一执行下一步。最后,任务执行完后,把Phaser中的线程均注销掉。

  Phaser其实有两个状态:活跃态和终止态。当存在参与同步的线程时,Phaser就是活跃的。并且在每个阶段结束的时候同步。当所有参与同步的线程都取消注册的时候,Phase就处于终止状态。在这种状态下,Phaser没有任务参与者。

  Phaser主要功能就是执行多阶段任务,并保证每个阶段点的线程同步。在每个阶段点还可以条件或者移除参与者。主要涉及方法arriveAndAwaitAdvance()和register()和arriveAndDeregister()

上一篇下一篇

猜你喜欢

热点阅读