Android 并发之CountDownLatch、Cyclic
Android 线程简单分析(一)
Android 并发之synchronized锁住的是代码还是对象(二)
Android 并发之CountDownLatch、CyclicBarrier的简单应用(三)
Android 并发HashMap和ConcurrentHashMap的简单应用(四)(待发布)
Android 并发之Lock、ReadWriteLock和Condition的简单应用(五)
Android 并发之CAS(原子操作)简单介绍(六)
Android 并发Kotlin协程的重要性(七)(待发布)
Android 并发之AsyncTask原理分析(八)(待发布)
Android 并发之Handler、Looper、MessageQueue和ThreadLocal消息机制原理分析(九)
Android 并发之HandlerThread和IntentService原理分析(十)
CountDownLatch:
CountDownLatch.png它可以实现类似计数器的功能。比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。
构造函数:
public CountDownLatch(int count) {
}
参数count为计数值
然后下面这3个方法是CountDownLatch类中最重要的方法:
public void await() throws InterruptedException { }; //调用await()方法的线程,如果count>0会被挂起,它会等待直到 count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown() { }; //将count值减1
场景
类似的场景,项目开发流程,将各个功能模块分发至每个人手里,每个人做完及时汇报,当所有人都开发完成,那么接下来就是发布上线,CountDownLatch就是等待一组任务完成之后,继续执行。
举个栗子:
创建Project线程类CountDownLatchProject,并声明一个CountDownLatch属性mCountDownLatch来控制Project项目线程等待所有开发者开发完成。项目线程启动后会调用await()方法并进入等待状态,每一个开发者开发完成后调用complete()方法,并把mCountDownLatch中的计数器减1,当计数器等于0的时候项目线程继续执行;
项目线程类:
public class CountDownLatchProject implements Runnable {
private CountDownLatch mCountDownLatch;
public CountDownLatchProject(int number) {
this.mCountDownLatch = new CountDownLatch(number);
}
public void complete(String name) {
Log.e("tag", name + "完成...............");
mCountDownLatch.countDown();
}
@Override
public void run() {
Log.e("tag", "项目开始..................");
try {
mCountDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("tag", "项目上线.....................");
}
}
开发者类:
public class CountDownLatchdevalper implements Runnable {
private CountDownLatchProject mCountDownLatchProject;
public CountDownLatchdevalper(CountDownLatchProject count) {
this.mCountDownLatchProject = count;
}
@Override
public void run() {
Log.e("tag", "" + Thread.currentThread().getName() + "开始开发.............");
try {
long duration = (long) (Math.random() * 20);
TimeUnit.SECONDS.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}
mCountDownLatchProject.complete(Thread.currentThread().getName());
}
}
创建一个项目开发,并初始化10个人参加项目开发,当所有开发人员开发完成,项目进入下一阶段:
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
CountDownLatchProject project = new CountDownLatchProject(10);
new Thread(project).start();
for (int i = 0; i < 10; i++) {
CountDownLatchdevalper count = new CountDownLatchdevalper(project);
new Thread(count).start();
}
}
}
查看日志:
CountDownLatchProject: 项目开始..................
CountDownLatchdevalper: Thread-232开始了............
CountDownLatchdevalper: Thread-228开始了............
CountDownLatchdevalper: Thread-229开始了............
CountDownLatchdevalper: Thread-231开始了............
CountDownLatchdevalper: Thread-230开始了............
CountDownLatchdevalper: Thread-233开始了............
CountDownLatchdevalper: Thread-234开始了............
CountDownLatchdevalper: Thread-236开始了............
CountDownLatchdevalper: Thread-235开始了............
CountDownLatchdevalper: Thread-237开始了............
CountDownLatchProject: Thread-233完成...............
Thread-233CountDownLatchProject: 还有: 9 人未完成
CountDownLatchProject: Thread-232完成...............
Thread-232CountDownLatchProject: 还有: 8 人未完成
CountDownLatchProject: Thread-228完成...............
Thread-228CountDownLatchProject: 还有: 7 人未完成
CountDownLatchProject: Thread-237完成...............
Thread-237CountDownLatchProject: 还有: 6 人未完成
CountDownLatchProject: Thread-230完成...............
Thread-230CountDownLatchProject: 还有: 5 人未完成
CountDownLatchProject: Thread-235完成...............
Thread-235CountDownLatchProject: 还有: 4 人未完成
CountDownLatchProject: Thread-229完成...............
Thread-229CountDownLatchProject: 还有: 3 人未完成
CountDownLatchProject: Thread-231完成...............
Thread-231CountDownLatchProject: 还有: 2 人未完成
CountDownLatchProject: Thread-234完成...............
Thread-234CountDownLatchProject: 还有: 1 人未完成
CountDownLatchProject: Thread-236完成...............
Thread-236CountDownLatchProject: 还有: 0 人未完成
CountDownLatchProject: 项目上线....................
- CountDownLatch是当前线程等待其他一组线程任务完成之后在继续往下执行;
- CountDownLatch并不是用来保护共享资源同步访问的,而是用来控制并发线程等待的;
- CountDownLatch只允许使用一次,一旦内部计数器等于0,再调用这个方法将不起作用,如果还有第二次并发等待,你还得创建一个新的CountDownLatch。
- 有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了
CyclicBarrier:
字面意思循环屏障,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做循环是因为当所有等待线程都执行完成以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做屏障,当调用await()方法之后,线程就处在屏障点了,屏障就是要阻碍的意思,它要做的事情是让一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时候,屏障才会开门。所有被屏障拦截的线程才会运行。
CyclicBarrier.png
构造函数:
1:
public CyclicBarrier(int parties, Runnable barrierAction) {
}
2:
public CyclicBarrier(int parties) {
}
参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容。
然后CyclicBarrier中最重要的方法就是await方法:
public int await() throws InterruptedException, BrokenBarrierException { };//来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { };//让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务
举个栗子:
public class Statistic implements Runnable {
private CyclicBarrier mCyclicBarrier;
public Statistic(CyclicBarrier parties) {
this.mCyclicBarrier = parties;
}
@Override
public void run() {
Log.e("tag", "mCyclicBarrier: " + Thread.currentThread().getName() + "开始" + mCyclicBarrier.getNumberWaiting());
try {
long duration = (long) (Math.random() * 20);
TimeUnit.SECONDS.sleep(duration);
mCyclicBarrier.await();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.e("tag", "mCyclicBarrier: " + Thread.currentThread().getName() + "结束");
}
}
创建3个Statistic线程类:
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
new Thread(new Statistic(cyclicBarrier)).start();
}
}
}
查看日志:
mCyclicBarrier: Thread-143开始0
mCyclicBarrier: Thread-144开始0
mCyclicBarrier: Thread-142开始0
mCyclicBarrier: Thread-143结束
mCyclicBarrier: Thread-144结束
mCyclicBarrier: Thread-142结束
从上面输出结果可以看出,每个线程执行完操作之后,就在等待其他线程的操作完毕,然后一起执行,即,当所有线程线程写入操作完毕之后,所有线程就继续进行后续的操作了。
如果说想要在所有线程执行完之后,进行额外的其他操作可以为使用CyclicBarrier构造函数提供Runnable参数;
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
Log.e("tag", "执行完,做额外的操作,然后接着后续的操作。");
}
});
for (int i = 0; i < 3; i++) {
new Thread(new Statistic(cyclicBarrier)).start();
}
查看日志:
mCyclicBarrier: Thread-154开始0
mCyclicBarrier: Thread-152开始0
mCyclicBarrier: Thread-153开始0
执行完,做额外的操作,然后接着后续的操作,Thread-154。
mCyclicBarrier: Thread-152结束
mCyclicBarrier: Thread-153结束
mCyclicBarrier: Thread-154结束
当3个线程都到达barrier状态后,会从3个线程中选择最后一个到达同步点的线程去执行Runnable,然后继续后续操作,注意:CyclicBarrier可以复用,而CountDownLatch无法进行重复使用;
看看CycliBarrier中等待其他线程的源码:
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//可重入锁,独占锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // 最后一个线程到达屏障点
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null) {
command.run();//先执行额外的操作
}
ranAction = true;
nextGeneration();//生成下一代
return 0;
} finally {
//这是正常完成操作,所以不会破坏新生成的代
if (!ranAction)
breakBarrier();
}
}
//循环等待,直到到达同步点,断裂或被破坏,中断或者超时
for (; ; ) {
//------------------线程被挂起
try {
if (!timed)
trip.await();//线程等待,挂起释放锁
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);//线程超时等待,挂起释放锁
} catch (InterruptedException ie) {
if (g == generation && !g.broken) {
breakBarrier();//断裂当前代,如果要复用,必须调用reset方法
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
//线程被唤醒继续执行
if (g.broken)
throw new BrokenBarrierException();
//换代
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
trip.signalAll(); //唤醒所有等待的线程
count = parties;
generation = new Generation();
}
private void breakBarrier() {
generation.broken = true;//标志当前代断裂或被破坏
count = parties;
trip.signalAll();//唤醒所有等待的线程
}
- 其实CyclicBarrier中使用了ReentrantLock 独享锁或者可重入锁;
- 当前线程执行await方法是其实是调用ReentrantLock 的await方法,进入条件等待队里,并释放锁;
- 当所有的线程都到达屏障点,会选择最后一个到达屏障点的线程执行barrierCommand,然后调用ReentrantLock .signalAll唤醒所有线程,线程将进入同步队列,竞争锁,ReentrantLock (非公平锁),并调用nextGeneration()重置Generation,所以正常情况下重用CyclicBarrier可以不用调用reset方法,出现异常情况就需要调用reset方法重置状态,CyclicBarrier是可以复用的,而CountDownLatch无法进行重复使用;