Java多线程之批量发起请求
2023-11-29 本文已影响0人
小小土豆dev
日常开发中可能会遇到批量发起请求的场景,如:从某个服务器拉取大批量数据,如果一次拉取,数据量太大。采用分页拉取方式,一页一页拉取,比较耗时,此时我们可以批量的同时去拉数据。
如何发起批量请求呢?可以使用Java多线程去实现。每个线程拉取数据的执行时间可能不一致,我们希望先执行完任务的线程可以优先返回数据,不是等待所有线程都执行完,才返回数据。
Future方式
public static void sleep(long time) {
try {
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void test() {
long start = System.currentTimeMillis();
//定义ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(3);
//定义批量任务,每个任务的耗时不等
final List<Callable<Integer>> tasks = Arrays.asList(
() -> {
sleep(30L);
System.out.println("Task 30 completed done.");
return 30;
},
() -> {
sleep(10L);
System.out.println("Task 10 completed done.");
return 10;
},
() -> {
sleep(20L);
System.out.println("Task 20 completed done.");
return 20;
}
);
//批量提交执行异步任务,
try {
List<Future<Integer>> futures = executor.invokeAll(tasks);
futures.forEach(future -> {
try {
System.out.println("返回结果: " + future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
executor.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
long seed = end - start;
System.out.format("seed=%s", seed);
}
执行结果:
Task 30 completed done.
Task 10 completed done.
Task 20 completed done.
返回结果: 30
返回结果: 10
返回结果: 20
seed=30068
我们无法优先获取最先执行完任务线程的结果,而是等到耗时(30s)最长的任务执行完毕后才可以拿到结果。
CompletableFuture方式
public static void test2() {
long start = System.currentTimeMillis();
List<String> list = Arrays.asList("1", "2", "3");
List<CompletableFuture<String>> futures = list.stream()
.map(item ->
CompletableFuture.supplyAsync(() -> {
if ("1".equals(item)) {
sleep(10L);
} else if ("2".equals(item)) {
sleep(30L);
} else {
sleep(20L);
}
System.out.println("thread name: " + Thread.currentThread().getName() + " task: " + item);
return "任务" + item;
}
)).collect(Collectors.toList());
futures.forEach(future -> {
future.thenAccept(result -> {
System.out.println("返回结果: " + result);
});
});
futures.forEach(future -> {
future.join();
});
long end = System.currentTimeMillis();
long seed = end - start;
System.out.format("seed=%s", seed);
}
执行结果:
thread name: ForkJoinPool.commonPool-worker-9 task: 1
返回结果: 任务1
thread name: ForkJoinPool.commonPool-worker-11 task: 3
返回结果: 任务3
thread name: ForkJoinPool.commonPool-worker-2 task: 2
返回结果: 任务2
seed=30079
可以优先拿到最先执行完任务线程的执行结果。
CompletionService方式
public static void test3() {
long start = System.currentTimeMillis();
//定义ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(3);
//定义批量任务,每个任务的耗时不等
final List<Callable<Integer>> tasks = Arrays.asList(
() -> {
sleep(30L);
System.out.println("Task 30 completed done.");
return 30;
},
() -> {
sleep(10L);
System.out.println("Task 10 completed done.");
return 10;
},
() -> {
sleep(20L);
System.out.println("Task 20 completed done.");
return 20;
}
);
//批量提交执行异步任务,
try {
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
tasks.forEach(completionService::submit);
for (int i = 0; i < tasks.size(); i++) {
try {
System.out.println("返回结果: " + completionService.take().get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
long seed = end - start;
System.out.format("seed=%s", seed);
}
执行结果:
Task 10 completed done.
返回结果: 10
Task 20 completed done.
Task 30 completed done.
返回结果: 20
返回结果: 30
seed=30064
可以优先拿到最先执行完任务线程的执行结果。
推荐
https://juejin.cn/post/6844904195162636295
https://www.jianshu.com/p/2528550e94a9