java.util.concurrent概览
2019-02-18 本文已影响0人
传葱
前言
- java.util.concurrent提供了许多提高并发读写性能的工具类,合格的开发者应该能够熟练使用这些开发工具,辅助开发,主要看一下怎么使用的。
核心类
- Executor
- ExecutorService
- ScheduledExecutorService
- Future
- CountDownLatch
- CyclicBarrier
- Semaphore
- ThreadFactory
- BlockingQueue
- DelayQueue
- Locks
- Phaser
Executor
- 这个接口定义表达一个执行任务的对象
public class InvokeExecutor implements Executor {
@Override
public void execute(Runnable command) {
command.run();
}
public static void main(String[] args) {
executor();
}
public static void executor() {
//替代专门create一个线程
Executor executor = new InvokeExecutor();
executor.execute(() -> {
System.out.println("hello,world!");
});
executor.execute(() -> {
System.out.println("lalal");
});
}
}
- new 一个Executor可以执行多个不同的任务
ExecutorService
- 这个类一般用于线程池,异步执行线程,内存的queue中存储task,线程调度
public class ExecutorServiceTest {
ExecutorService executor = Executors.newFixedThreadPool(10);
public void executor() {
executor.submit(() -> {
new Task();
});
executor.submit(() -> {
new Task1();
});
}
public static void main(String[] args) {
ExecutorServiceTest executorServiceTest = new ExecutorServiceTest();
executorServiceTest.executor();
}
}
class Task implements Runnable {
@Override
public void run() {
}
}
class Task1 implements Runnable {
@Override
public void run() {
}
}
- 具体线程调度交给线程池来解决
- 看下Termination方法:shutDown(),所有task执行完毕之后关闭线程池,释放资源;shutdownNow(),立即关闭线程池,释放资源,强制所有线程停止。
ScheduledExecutorService
- 这个和上个ExecutorService很像,但是ScheduledExecutorService会延迟执行任务
public class ScheduleTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduleTest scheduleTest = new ScheduleTest();
scheduleTest.executor();
}
public void executor() throws ExecutionException, InterruptedException {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
Future<String> future = scheduledExecutorService.schedule(() -> {
return "hello, world!";
}, 1, TimeUnit.SECONDS);
System.out.println(future.get());
//上一个任务执行完成后delay delay长的时间开始执行下个任务
scheduledExecutorService.scheduleWithFixedDelay(() -> {
System.out.println("lalal");
}, 1, 2 ,TimeUnit.SECONDS);
//间隔perid开始执行新的任务
scheduledExecutorService.scheduleAtFixedRate(() -> {
System.out.println("lalal");
}, 1, 2 ,TimeUnit.SECONDS);
// scheduledExecutorService.shutdown();
}
}
- scheduledExecutorService.schedule:很好理解,delay长的时间后开始执行任务,执行一次,比如这里是1秒后执行任务;
- scheduledExecutorService.scheduleAtFixedRate:1秒后执行任务,此后每次间隔两秒执行任务;
- scheduledExecutorService.scheduleWithFixedDelay:1秒后执行任务,任务执行完成后,间隔两秒执行下一个任务
Future
- Future一般用来获得异步执行的结果。可以用来判断异步操作是否执行完毕,获取计算的结果。
public void invoke() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(() -> {
// ...
Thread.sleep(10000l);
return "Hello world";
});
}
if (future.isDone() && !future.isCancelled()) {
try {
str = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
- 看一下用法,isDone,但是没有cancelled,这个时候,可以安全的获取future存储的value。
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
- 上面的用法,可以设置等待的timeOut,超过这个时间,抛出TimeoutException。
CountDownLatch
- jdk5提供的,可以阻塞一队线程,一直到某个操作发生
- counter(Integer type);
- counter不断decrements,一直到减为0,这个时候其它的线程都会释放。
- 比如一个电子门,上午7点开始,超过8个人开门之后,大门完全打开。
CyclicBarrier
- 这个工具和CountDownLatch很像,但是CountDownLatch减为0的时候就不能重复使用了;
- CyclicBarrier可以重复使用,线程之间互相等待,一直到所有的线程准备就绪。
- 使用:校车需要等所有的小朋友都上车后才会启动。
public class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
LOG.info(Thread.currentThread().getName() +
" is waiting");
barrier.await();
LOG.info(Thread.currentThread().getName() +
" is released");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public void start() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
// ...
LOG.info("All previous tasks are completed");
});
Thread t1 = new Thread(new Task(cyclicBarrier), "T1");
Thread t2 = new Thread(new Task(cyclicBarrier), "T2");
Thread t3 = new Thread(new Task(cyclicBarrier), "T3");
if (!cyclicBarrier.isBroken()) {
t1.start();
t2.start();
t3.start();
}
}
- T1, T2, T3互相等待,CyclicBarrier初始化为0,满足条件需要增长到3,每当一个线程await,增1。
- cyclicBarrier.isBroken():所有线程都进入等待状态.