FutureTask、CompletionService和Lis
2019-01-12 本文已影响0人
风一样的存在
1.FutureTask
可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,Executor框架利用FutureTask来完成异步任务,并可以用来进行任何潜在的耗时的计算。一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
/**
* @Auther: jack
* @Date: 2018/10/30 22:14
* @Description:
*/
public class FutureTaskService {
/**
* 生成车票
*
* @return
*/
public List<String> createTickets() {
List<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add("车票" + i);
}
return list;
}
public void sellTicket() {
//获取车票
List<String> list = createTickets();
List<FutureTask<String>> taskList = new ArrayList<>();
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < list.size(); i++) {
// 传入Callable对象创建FutureTask对象
FutureTask<String> futureTask = new FutureTask<>(new Task(list.get(i)));
taskList.add(futureTask);
// 提交给线程池执行任务,executorService.invokeAll(taskList)一次性提交所有任务;
executorService.submit(futureTask);
}
try {
for (FutureTask<String> futureTask : taskList) {
//FutureTask的get方法会自动阻塞,直到获取计算结果为止
String result = futureTask.get();
System.out.println("处理的返回值:" + result);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
System.out.println("操作完毕");
executorService.shutdown();
}
}
public static void main(String[] args) {
new FutureTaskService().sellTicket();
}
class Task implements Callable<String> {
private String ticket;
/**
* 构造方法,用于参数传递
*
* @param ticket
*/
public Task(String ticket) {
this.ticket = ticket;
}
@Override
public String call() throws Exception {
System.out.println("窗口:" + Thread.currentThread().getName() + ",已卖" + ticket);
return ticket;
}
}
}
运行结果:可以看出是按顺序返回每个任务的处理结果,验证了阻塞效果,第一个任务没有处理完是不会处理第二个任务的。
FutureTask.png
2. CompletionService
当向Executor提交批处理任务时,并且希望在它们完成后获得结果,如果用FutureTask,你可以循环获取task,并用future.get()去获取结果,但是如果这个task没有完成,你就得阻塞在这里,这个实效性不高,其实在很多场合,其实你拿第一个任务结果时,此时结果并没有生成并阻塞,其实在阻塞在第一个任务时,第二个task的任务已经早就完成了,显然这种情况用FutureTask不合适的,效率也不高的。
/**
* @Auther: jack
* @Date: 2018/10/31 22:14
* @Description: jdk实现的CompletionService(先执行完的先输出结果)
*/
public class CompleteService {
/**
* 生成车票
*
* @return
*/
public List<String> createTickets() {
List<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add("车票" + i);
}
return list;
}
public void sellTicket() {
//获取车票
List<String> list = createTickets();
//定义线程数
ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<String> completionService= new ExecutorCompletionService<String>(pool);
for (int i = 0; i < list.size(); i++) {
completionService.submit(new CompleteService.Task(list.get(i)));
}
try {
//所有都执行完毕
for(int i=0;i<list.size();i++){
String result=completionService.take().get();
System.out.println("处理的返回值:" + result.toString());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("操作完毕");
pool.shutdown();
}
}
public static void main(String[] args) {
new CompleteService().sellTicket();
}
class Task implements Callable<String> {
private String ticket;
/**
* 构造方法,用于参数传递
*
* @param ticket
*/
public Task(String ticket) {
this.ticket = ticket;
}
@Override
public String call() throws Exception {
System.out.println("窗口:" + Thread.currentThread().getName() + ",已卖" + ticket);
return ticket;
}
}
}
运行结果:可以看出先执行完的任务先返回处理结果。
CompletionService.png
3. ListenableFuture
ListenableFuture是guava提供的一个类,ListenableFuture是对原有Future的增强,可以用于监听Future任务的执行状况,是执行成功还是执行失败,并提供响应的接口用于对不同结果的处理。
/**
* @Auther: jack
* @Date: 2018/9/19 17:57
* @Description: 模拟
*/
@Slf4j
public class GuavaMultiThread {
/**
* 生成车票
*
* @return
*/
public List<String> createTickets() {
List<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
list.add("车票" + i);
}
return list;
}
public void sellTicket() {
//获取车票
List<String> list = createTickets();
List<ListenableFuture<String>> futures = Lists.newArrayList();
//定义线程数
ExecutorService pool = Executors.newFixedThreadPool(5);
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(pool);
for (int i = 0; i < list.size(); i++) {
futures.add(executorService.submit(new Task(list.get(i))));
//为每一个执行过程添加回调处理
/*Futures.addCallback(executorService.submit(new Task(list.get(i))), new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String s) {
System.out.println("操作成功:"+s);
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("操作失败!");
}
},pool);*/
}
final ListenableFuture<List<String>> resultsFuture = Futures.successfulAsList(futures);
try {//所有都执行完毕
List<String> result=resultsFuture.get();
System.out.println("处理的返回值:"+result.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("操作完毕");
pool.shutdown();
}
}
public static void main(String[] args) {
new GuavaMultiThread().sellTicket();
}
class Task implements Callable<String> {
private String ticket;
/**
* 构造方法,用于参数传递
*
* @param ticket
*/
public Task(String ticket) {
this.ticket = ticket;
}
@Override
public String call() throws Exception {
System.out.println("窗口:"+Thread.currentThread().getName()+",已卖" + ticket);
return ticket;
}
}
}
运行结果:
ListenableFuture.png