Java之家多线程

Java多线程之批量发起请求

2023-11-29  本文已影响0人  小小土豆dev

日常开发中可能会遇到批量发起请求的场景,如:从某个服务器拉取大批量数据,如果一次拉取,数据量太大。采用分页拉取方式,一页一页拉取,比较耗时,此时我们可以批量的同时去拉数据。

如何发起批量请求呢?可以使用Java多线程去实现。每个线程拉取数据的执行时间可能不一致,我们希望先执行完任务的线程可以优先返回数据,不是等待所有线程都执行完,才返回数据。

Future方式

public static void sleep(long time) {
    try {
        TimeUnit.SECONDS.sleep(time);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
public static void test() {
    long start = System.currentTimeMillis();

    //定义ExecutorService
    ExecutorService executor = Executors.newFixedThreadPool(3);

    //定义批量任务,每个任务的耗时不等
    final List<Callable<Integer>> tasks = Arrays.asList(
            () -> {
                sleep(30L);
                System.out.println("Task 30 completed done.");
                return 30;
            },
            () -> {
                sleep(10L);
                System.out.println("Task 10 completed done.");
                return 10;
            },
            () -> {
                sleep(20L);
                System.out.println("Task 20 completed done.");
                return 20;
            }
    );

    //批量提交执行异步任务,
    try {
        List<Future<Integer>> futures = executor.invokeAll(tasks);
        futures.forEach(future -> {
            try {
                System.out.println("返回结果: " + future.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        executor.shutdown();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    long end = System.currentTimeMillis();
    long seed = end - start;
    System.out.format("seed=%s", seed);
}

执行结果:

Task 30 completed done.
Task 10 completed done.
Task 20 completed done.
返回结果: 30
返回结果: 10
返回结果: 20
seed=30068

我们无法优先获取最先执行完任务线程的结果,而是等到耗时(30s)最长的任务执行完毕后才可以拿到结果。

CompletableFuture方式

public static void test2() {
    long start = System.currentTimeMillis();

    List<String> list = Arrays.asList("1", "2", "3");
    List<CompletableFuture<String>> futures = list.stream()
            .map(item ->
                    CompletableFuture.supplyAsync(() -> {
                                if ("1".equals(item)) {
                                    sleep(10L);
                                } else if ("2".equals(item)) {
                                    sleep(30L);
                                } else {
                                    sleep(20L);
                                }

                                System.out.println("thread name: " + Thread.currentThread().getName() + " task: " + item);
                                return "任务" + item;
                            }
                    )).collect(Collectors.toList());

    futures.forEach(future -> {
        future.thenAccept(result -> {
            System.out.println("返回结果: " + result);
        });
    });

    futures.forEach(future -> {
        future.join();
    });

    long end = System.currentTimeMillis();
    long seed = end - start;
    System.out.format("seed=%s", seed);
}

执行结果:

thread name: ForkJoinPool.commonPool-worker-9 task: 1
返回结果: 任务1
thread name: ForkJoinPool.commonPool-worker-11 task: 3
返回结果: 任务3
thread name: ForkJoinPool.commonPool-worker-2 task: 2
返回结果: 任务2
seed=30079

可以优先拿到最先执行完任务线程的执行结果。

CompletionService方式

public static void test3() {
    long start = System.currentTimeMillis();
    //定义ExecutorService
    ExecutorService executor = Executors.newFixedThreadPool(3);

    //定义批量任务,每个任务的耗时不等
    final List<Callable<Integer>> tasks = Arrays.asList(
            () -> {
                sleep(30L);
                System.out.println("Task 30 completed done.");
                return 30;
            },
            () -> {
                sleep(10L);
                System.out.println("Task 10 completed done.");
                return 10;
            },
            () -> {
                sleep(20L);
                System.out.println("Task 20 completed done.");
                return 20;
            }
    );

    //批量提交执行异步任务,
    try {
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
        tasks.forEach(completionService::submit);

        for (int i = 0; i < tasks.size(); i++) {
            try {
                System.out.println("返回结果: " + completionService.take().get());

            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        executor.shutdown();
    } catch (Exception e) {
        e.printStackTrace();
    }

    long end = System.currentTimeMillis();
    long seed = end - start;
    System.out.format("seed=%s", seed);
}

执行结果:

Task 10 completed done.
返回结果: 10
Task 20 completed done.
Task 30 completed done.
返回结果: 20
返回结果: 30
seed=30064

可以优先拿到最先执行完任务线程的执行结果。

推荐

https://juejin.cn/post/6844904195162636295
https://www.jianshu.com/p/2528550e94a9

上一篇 下一篇

猜你喜欢

热点阅读