Android 源码分析

Android 并发之CountDownLatch、Cyclic

2019-03-04  本文已影响0人  有没有口罩给我一个

Android 线程简单分析(一)
Android 并发之synchronized锁住的是代码还是对象(二)
Android 并发之CountDownLatch、CyclicBarrier的简单应用(三)
Android 并发HashMap和ConcurrentHashMap的简单应用(四)(待发布)
Android 并发之Lock、ReadWriteLock和Condition的简单应用(五)
Android 并发之CAS(原子操作)简单介绍(六)
Android 并发Kotlin协程的重要性(七)(待发布)
Android 并发之AsyncTask原理分析(八)(待发布)
Android 并发之Handler、Looper、MessageQueue和ThreadLocal消息机制原理分析(九)
Android 并发之HandlerThread和IntentService原理分析(十)

CountDownLatch:

它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。

CountDownLatch.png

构造函数:

public CountDownLatch(int count) {
}

参数count为计数值

然后下面这3个方法是CountDownLatch类中最重要的方法:

public void await() throws InterruptedException { };   //调用await()方法的线程,如果count>0会被挂起,它会等待直到    count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() { };  //将count值减1
场景

类似的场景,项目开发流程,将各个功能模块分发至每个人手里,每个人做完及时汇报,当所有人都开发完成,那么接下来就是发布上线,CountDownLatch就是等待一组任务完成之后,继续执行。

举个栗子:

创建Project线程类CountDownLatchProject,并声明一个CountDownLatch属性mCountDownLatch来控制Project项目线程等待所有开发者开发完成。项目线程启动后会调用await()方法并进入等待状态,每一个开发者开发完成后调用complete()方法,并把mCountDownLatch中的计数器减1,当计数器等于0的时候项目线程继续执行;

项目线程类:

public class CountDownLatchProject implements Runnable {
private CountDownLatch mCountDownLatch;

public CountDownLatchProject(int number) {
    this.mCountDownLatch = new CountDownLatch(number);
}

public void complete(String name) {
    Log.e("tag", name + "完成...............");
    mCountDownLatch.countDown();
}

@Override
public void run() {
    Log.e("tag", "项目开始..................");
    try {
        mCountDownLatch.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Log.e("tag", "项目上线.....................");
  }
}

开发者类:

public class CountDownLatchdevalper implements Runnable {

private CountDownLatchProject mCountDownLatchProject;

public CountDownLatchdevalper(CountDownLatchProject count) {
    this.mCountDownLatchProject = count;
}

@Override
public void run() {
    Log.e("tag", "" + Thread.currentThread().getName() + "开始开发.............");
    try {
        long duration = (long) (Math.random() * 20);
        TimeUnit.SECONDS.sleep(duration);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    mCountDownLatchProject.complete(Thread.currentThread().getName());
}
}

创建一个项目开发,并初始化10个人参加项目开发,当所有开发人员开发完成,项目进入下一阶段:

public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    CountDownLatchProject project = new CountDownLatchProject(10);
    new Thread(project).start();


    for (int i = 0; i < 10; i++) {
        CountDownLatchdevalper count = new CountDownLatchdevalper(project);
        new Thread(count).start();
        }
    }
}

查看日志:

CountDownLatchProject: 项目开始..................
CountDownLatchdevalper: Thread-232开始了............
CountDownLatchdevalper: Thread-228开始了............
CountDownLatchdevalper: Thread-229开始了............
CountDownLatchdevalper: Thread-231开始了............
CountDownLatchdevalper: Thread-230开始了............
CountDownLatchdevalper: Thread-233开始了............
CountDownLatchdevalper: Thread-234开始了............
CountDownLatchdevalper: Thread-236开始了............
CountDownLatchdevalper: Thread-235开始了............
CountDownLatchdevalper: Thread-237开始了............
CountDownLatchProject: Thread-233完成...............
Thread-233CountDownLatchProject: 还有: 9 人未完成
CountDownLatchProject: Thread-232完成...............
Thread-232CountDownLatchProject: 还有: 8 人未完成
CountDownLatchProject: Thread-228完成...............
Thread-228CountDownLatchProject: 还有: 7 人未完成
CountDownLatchProject: Thread-237完成...............
Thread-237CountDownLatchProject: 还有: 6 人未完成
CountDownLatchProject: Thread-230完成...............
Thread-230CountDownLatchProject: 还有: 5 人未完成
CountDownLatchProject: Thread-235完成...............
Thread-235CountDownLatchProject: 还有: 4 人未完成
CountDownLatchProject: Thread-229完成...............
Thread-229CountDownLatchProject: 还有: 3 人未完成
CountDownLatchProject: Thread-231完成...............
Thread-231CountDownLatchProject: 还有: 2 人未完成
CountDownLatchProject: Thread-234完成...............
Thread-234CountDownLatchProject: 还有: 1 人未完成
CountDownLatchProject: Thread-236完成...............
Thread-236CountDownLatchProject: 还有: 0 人未完成
CountDownLatchProject: 项目上线....................

CyclicBarrier:

字面意思循环屏障,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做循环是因为当所有等待线程都执行完成以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做屏障,当调用await()方法之后,线程就处在屏障点了,屏障就是要阻碍的意思,它要做的事情是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时候,屏障才会开门。所有被屏障拦截的线程才会运行。


CyclicBarrier.png

构造函数:

1:
public CyclicBarrier(int parties, Runnable barrierAction) {
}
2:
public CyclicBarrier(int parties) {
}

参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容。

然后CyclicBarrier中最重要的方法就是await方法:

public int await() throws InterruptedException, BrokenBarrierException { };//来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };//让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务

举个栗子:

public class Statistic implements Runnable {
    private CyclicBarrier mCyclicBarrier;


public Statistic(CyclicBarrier parties) {
    this.mCyclicBarrier = parties;
}

@Override
public void run() {
    Log.e("tag", "mCyclicBarrier:  " + Thread.currentThread().getName() + "开始" + mCyclicBarrier.getNumberWaiting());
    try {
        long duration = (long) (Math.random() * 20);
        TimeUnit.SECONDS.sleep(duration);
        mCyclicBarrier.await();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    Log.e("tag", "mCyclicBarrier:  " + Thread.currentThread().getName() + "结束");
    }
}

创建3个Statistic线程类:

public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    for (int i = 0; i < 3; i++) {
        new Thread(new Statistic(cyclicBarrier)).start();
      }
    }
 }

查看日志:

  mCyclicBarrier:  Thread-143开始0
  mCyclicBarrier:  Thread-144开始0
  mCyclicBarrier:  Thread-142开始0
  mCyclicBarrier:  Thread-143结束
  mCyclicBarrier:  Thread-144结束
  mCyclicBarrier:  Thread-142结束
从上面输出结果可以看出,每个线程执行完操作之后,就在等待其他线程的操作完毕,然后一起执行,即,当所有线程线程写入操作完毕之后,所有线程就继续进行后续的操作了。

如果说想要在所有线程执行完之后,进行额外的其他操作可以为使用CyclicBarrier构造函数提供Runnable参数;

CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
        @Override
        public void run() {
            Log.e("tag", "执行完,做额外的操作,然后接着后续的操作。");
        }
    });
    for (int i = 0; i < 3; i++) {
        new Thread(new Statistic(cyclicBarrier)).start();
    }

查看日志:

mCyclicBarrier:  Thread-154开始0
mCyclicBarrier:  Thread-152开始0
mCyclicBarrier:  Thread-153开始0
执行完,做额外的操作,然后接着后续的操作,Thread-154。
mCyclicBarrier:  Thread-152结束
mCyclicBarrier:  Thread-153结束
mCyclicBarrier:  Thread-154结束
当3个线程都到达barrier状态后,会从3个线程中选择最后一个到达同步点的线程去执行Runnable,然后继续后续操作,注意:CyclicBarrier可以复用,而CountDownLatch无法进行重复使用;

看看CycliBarrier中等待其他线程的源码:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
        TimeoutException {
    //可重入锁,独占锁
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        final Generation g = generation;
        if (g.broken)
            throw new BrokenBarrierException();
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        int index = --count;
        if (index == 0) {  // 最后一个线程到达屏障点
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null) {
                    command.run();//先执行额外的操作
                }
                ranAction = true;
                nextGeneration();//生成下一代
                return 0;
            } finally {
                //这是正常完成操作,所以不会破坏新生成的代
                if (!ranAction)
                    breakBarrier();
            }
        }
        //循环等待,直到到达同步点,断裂或被破坏,中断或者超时
        for (; ; ) {

            //------------------线程被挂起
            try {
                if (!timed)
                    trip.await();//线程等待,挂起释放锁
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);//线程超时等待,挂起释放锁
            } catch (InterruptedException ie) {
                if (g == generation && !g.broken) {
                    breakBarrier();//断裂当前代,如果要复用,必须调用reset方法
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }
            //线程被唤醒继续执行
            if (g.broken)
                throw new BrokenBarrierException();
            //换代
            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}


private void nextGeneration() {
    trip.signalAll(); //唤醒所有等待的线程
    count = parties;
    generation = new Generation();
}


private void breakBarrier() {
    generation.broken = true;//标志当前代断裂或被破坏
    count = parties;
    trip.signalAll();//唤醒所有等待的线程
}
上一篇下一篇

猜你喜欢

热点阅读