知识点

Java性能优化实践:异步多线程+同步执行

2022-01-11  本文已影响0人  GuangHui

当项目中某个业务需要调用另外N个服务接口,然后根据返回的结果做筛选再返回给前端。当然最简单的做法就是N个接口串行调用,但是如果每个接口调用的时间都在1秒以上那么N个接口调用完毕就需要耗费N秒,这在项目中是不可接受的,因此这个时候就需要用多线程处理。

而主线程要等待所有子线程任务执行完毕后再往下同步执行的实现方式,主要有以下几种方法:

方法一: 使用synchronized获得同步锁

这是最基础的方式,就是放置一个公用的static变量,假如有10个线程,每个线程处理完上去累加下结果,然后后面用一个死循环(或类似线程阻塞的方法),去数这个结果,达到10个,说明大家都执行完了,就可以执行后续的事情了,这个想法虽然土鳖,但是基本上跟语言无关,几乎所有主流编程语言都支持。

public class ThreadLockTest {
 
    public static Integer flag = 0;//公用变量
 
    public static void main(String[] args) throws Exception {
        ThreadLockTest testObj = new ThreadLockTest();
        final int threadNum = 10;
 
        for (int i = 0; i < threadNum; i++) {
            new Thread(new MyRunable(i, testObj)).start();
        }
 
        while (true) {
            if (testObj.flag >= threadNum) {
                System.out.println("-----------\n所有thread执行完成!");
                break;
            }
            Thread.sleep(10);
        }
    }
 
    static class MyRunable implements Runnable {
        int _i = 0;
        ThreadLockTest _test;
 
        public MyRunable(int i, ThreadLockTest test) {
            this._i = i;
            this._test = test;
        }
 
        @Override
        public void run() {
            try {
                Thread.sleep((long) (Math.random() * 10));
                System.out.println("thread " + _i + " done");
                //利用synchronized获得同步锁
                synchronized (_test) {
                    _test.flag += 1;
                }
                System.out.println("thread " + _i + " => " + _test.flag);//测试用
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

方法二: CountDownLatch

使用CountDownLatch 这个类是在JDK1.5就已经提供了,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。

其实,CountDownLatch实现方式,跟上面的方法一是类似的。它是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,接下来等待的线程就可以恢复工作了。

下面咱们简单看下源码:
countDownLatch类中只提供了一个构造器:

//参数count为计数值
public CountDownLatch(int count) {  };  

而类中有三个方法是最重要的:

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };   
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  
//将count值减1
public void countDown() { };  

以下例子就是一个很经典的CountDownLatch的用法

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchTest {
    public static void countDownLatchTest() {
        long time = System.currentTimeMillis();
        final CountDownLatch countDownLatch = new CountDownLatch(5);
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int num = i;
            es.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(num * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    /**
                     * 使用CountDownLatch时要注意异常情况,一旦没处理好导致countDownLatch.countDown()没执行会引起线程阻塞,导致CPU居高不下**/
                    /*if (num == 3) {
                        System.out.println(Integer.parseInt("1.233"));
                    }*/

                    countDownLatch.countDown();
                    System.out.println(Thread.currentThread().getName() + "运行结束  运行时间为:" + num
                            + "秒  countDownLatch=" + countDownLatch.getCount());
                }
            });
        }
        es.shutdown();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("总耗时===" + (System.currentTimeMillis() - time));

    }

    public static void main(String[] args) {
        countDownLatchTest();
    }
}

可以看到最终耗时跟线程中最耗时的线程有关,但是使用CountDownLatch有一定风险,如果运行中没有捕获相关异常很容易导致CPU居高不下从而导致整个项目无法运行(想测试的同学可以把countDownLatchTest中的注解打开)。

那么遇到这种问题如何处理,当然把整个代码try catch是一种解决方式。另外一种比较优雅的解决方式是使用countDownLatch.await(5, TimeUnit.SECONDS) 代替countDownLatch.await(),方法中的那2个参数分别是超时时间和超时单位,如果线程在规定的时间内没有处理完成则主线程被自动唤醒继续执行下一步操作。

以上方法可以实现对应功能但是有以下缺点:

方法三: Thread Join

join()方法的作用,让父线程等待子线程结束之后才能继续运行。

官方的解释是,当我们调用某个线程的这个方法时,这个方法会挂起调用线程,直到被调用线程结束执行,调用线程才会继续执行。

通过查看源码,会看到Join方法实现是通过wait方法,当main线程调用thread.join()时候,main线程会获得线程对象thread的锁(wait 意味着拿到该对象的锁),调用该对象的wait(等待时间),直到该对象唤醒main线程 ,比如退出后。

以下例子也是很经典的用法:

public class JoinTest {
    public static void main(String[] args) {
        joinTest();
    }

    public static void joinTest() {
        long time = System.currentTimeMillis();
        int threadNum = 5;
        Thread[] threads = new Thread[threadNum];
        for (int i = 1; i <= threadNum; i++) {
            final int num = i;
            threads[i - 1] = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(num * 1000);
                        System.out.println(Thread.currentThread().getName() + "耗时:" + num + "秒");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            threads[i - 1].start();
        }
        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("总耗时===" + (System.currentTimeMillis() - time));

    }
}

这个方法的效果跟上面使用CountDownLatch方式差不多,优缺点也一样,所以就不在过多探讨了。

方法四: CyclicBarrier

CyclicBarrier的中文意思是“循环栅栏”,大意是一个可循环利用的屏障。它的作用就是会让所有线程都等待完成后才会继续下一步行动。

举个例子,就像生活中我们会约朋友们到某个餐厅一起吃饭,有些朋友可能会早到,有些朋友可能会晚到,但是这个餐厅规定必须等到所有人到齐之后才会让我们进去。这里的朋友们就是各个线程,餐厅就是CyclicBarrier。

下面简单看下CyclicBarrier的源码:

构造方法:

public CyclicBarrier(int parties)
    /**
     * parties 是参与线程的个数
     * 第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
     */
public CyclicBarrier(int parties, Runnable barrierAction)

重要方法:

/**
* 线程调用 await() 表示自己已经到达栅栏
* BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断或者超时
*/
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

下面针对两个不同的构造方法举两个经典的示例:

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
    public static void main(String[] args) throws Exception {
        long time = System.currentTimeMillis();
        final int threadNum = 5;
        //注意:5个子线程 + 1个主线程
        CyclicBarrier cb = new CyclicBarrier(threadNum + 1);

        for (int i = 0; i < threadNum; i++) {
            new Thread(new MyRunable(cb, i)).start();
        }

        cb.await();
        System.out.println("所有thread执行完成!总耗时===" + (System.currentTimeMillis() - time));
    }

    static class MyRunable implements Runnable {
        CyclicBarrier barrier;
        int num = 0;

        public MyRunable(CyclicBarrier cb, int i) {
            this.barrier = cb;
            this.num = i;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(num * 1000L);
                System.out.println(Thread.currentThread() + ": 耗时==" + num + " 秒" + ",正在等候其它线程完成...");
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest2 {
    public static void main(String[] args) {
        int threadNum = 5;
        CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + " 完成最后任务");
            }
        });

        for (int i = 0; i < threadNum; i++) {
            new TaskThread(i + "", barrier).start();
        }
    }
}

class TaskThread extends Thread {
    String threadName;
    CyclicBarrier barrier;

    public TaskThread(String name, CyclicBarrier barrier) {
        this.threadName = name;
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
            /**
             * 使用CyclicBarrier时要注意异常情况,一旦没处理好导致cyclicBarrier.await()没执行会引起线程阻塞,导致CPU居高不下**/
            /*if ("3".equals(threadName)) {
                System.out.println(Integer.parseInt("1.233"));
            }*/
            System.out.println("线程:" + threadName + " 到达栅栏 A");
            barrier.await();
            System.out.println("线程:" + threadName + " 冲破栅栏 A");

            Thread.sleep(2000);
            System.out.println("线程:" + threadName + " 到达栅栏 B");
            barrier.await();
            System.out.println("线程:" + threadName + " 冲破栅栏 B");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

第二个示例,运行日志如下:

线程:1 到达栅栏 A
线程:0 到达栅栏 A
线程:3 到达栅栏 A
线程:4 到达栅栏 A
线程:2 到达栅栏 A
Thread-2 完成最后任务
线程:2 冲破栅栏 A
线程:1 冲破栅栏 A
线程:3 冲破栅栏 A
线程:4 冲破栅栏 A
线程:0 冲破栅栏 A
线程:4 到达栅栏 B
线程:3 到达栅栏 B
线程:1 到达栅栏 B
线程:0 到达栅栏 B
线程:2 到达栅栏 B
Thread-2 完成最后任务
线程:2 冲破栅栏 B
线程:4 冲破栅栏 B
线程:3 冲破栅栏 B
线程:1 冲破栅栏 B
线程:0 冲破栅栏 B

Process finished with exit code 0

从打印结果可以看出,所有线程会等待全部线程到达栅栏之后才会继续执行,并且最后到达的线程会完成 Runnable 的任务。看得出来这种方式,比较适合用于多线程计算数据,最后合并计算结果的场景。

另外对比一下,CyclicBarrier 与 CountDownLatch的区别:

另外,可以看到使用CyclicBarrier也是有一定风险的,如果运行中没有捕获相关异常很容易导致CPU居高不下从而导致整个项目无法运行(想测试的同学可以把注解打开),那么遇到这种问题如何处理,当然把整个代码try catch是一种解决方式。

另外一种比较优雅的解决方式是使用cyclicBarrier.await(5, TimeUnit.SECONDS) 代替cyclicBarrier.await(),方法中的那2个参数分别是超时时间和超时单位,如果线程在规定的时间内没有处理完成则主线程被自动唤醒继续执行下一步操作。

方法五: Future

JDK5新增了Future接口,用于描述一个异步计算的结果。

虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果等操作。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future类位于java.util.concurrent包下,它是一个接口:

public interface Future<V> {
    /**
     * 方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。
     *
     * @param mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。
     * @return 如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;
     * 如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;
     * 如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 方法表示任务是否被取消成功
     * @return 如果在任务正常完成前被取消成功,则返回 true
     */
    boolean isCancelled();

    /**
     * 方法表示任务是否已经完成
     * @return 若任务完成,则返回true
     */
    boolean isDone();

    /**
     * 方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
     * @return 任务执行的结果值
     * @throws InterruptedException
     * @throws ExecutionException
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null(并不是抛出异常,需要注意)。
     * @param timeout 超时时间
     * @param unit 超时单位
     * @return
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */
    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
}

从上面方法的注释可以看出,Futrue提供了三种功能:
1)判断任务是否完成;
2)能够中断任务;
3)能够获取任务执行结果。(最为常用的)

下面咱就利用Future.get()来实现需求,以下例子就是一个很经典的Future的用法

package cn.huolala.sdp.fulfillment.api.controller;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author XIAGUANGHUI
 * @version 1.0
 * @description
 * @date 2022/1/10 下午2:13
 */
public class FutureTest {
    public static void futureTest() {
        long time = System.currentTimeMillis();
        int threadNum = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
        List<Future<String>> list = new ArrayList<Future<String>>(threadNum);
        for (int i = 1; i <= threadNum; i++) {
            final int num = i;
            list.add(executorService.submit(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            Thread.sleep(num * 1000);
                            /*if (num == 3) {
                                System.out.println(Integer.parseInt("1.233"));
                            }*/
                            return Thread.currentThread() + ": 耗时==" + num + " 秒";
                        }
                    })
            );
        }
        for (Future<String> future : list) {
            try {
                System.out.println(future.get(5, TimeUnit.SECONDS));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        System.out.println("总耗时===" + (System.currentTimeMillis() - time));
        executorService.shutdownNow();

    }

    public static void main(String[] args) {
        futureTest();
    }
}

可以看到前面几种方式的效果都是一致的,但这个方法相对前面的方法而言,代码相对复杂一点,不过有返回值。

很多博客说使用不带等待时间限制的get方法时,如果子线程执行异常了会导致主线程长期阻塞,这其实是错误的,子线程执行异常时其异常会被捕获,然后修改任务的状态为异常结束并唤醒等待的主线程,get方法判断任务状态发生变更,就终止等待了,并抛出异常。要想验证这一点,只需要把上面注释的代码放开,就可以看到效果。

方法六:使用CompletableFuture

使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会被迫等待。

从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。

我们以获取股票价格为例,看看如何使用CompletableFuture:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureDemo {
    public static void main(String[] args) throws Exception {
        // 创建异步执行任务:
        CompletableFuture<Double> cf = CompletableFuture.supplyAsync(CompletableFutureDemo::fetchPrice);
        // 如果执行成功:
        cf.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 如果执行异常:
        cf.exceptionally((e) -> {
            e.printStackTrace();
            return null;
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        Thread.sleep(200);
    }

    static Double fetchPrice() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        if (Math.random() < 0.3) {
            throw new RuntimeException("fetch price failed!");
        }
        return 5 + Math.random() * 20;
    }
}

创建一个CompletableFuture是通过CompletableFuture.supplyAsync()实现的,它需要一个实现了Supplier接口的对象:

public interface Supplier<T> {
    T get();
}

这里我们用lambda语法简化了一下,直接传入CompletableFutureDemo::fetchPrice,因为CompletableFutureDemo.fetchPrice()静态方法的签名符合Supplier接口的定义(除了方法名外)。

紧接着,CompletableFuture已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture完成时和异常时需要回调的实例。完成时,CompletableFuture会调用Consumer对象:

public interface Consumer<T> {
    void accept(T t);
}

异常时,CompletableFuture会调用Function对象:

public interface Function<T, R> {
    R apply(T t);
}

这里我们都用lambda语法简化了代码。

可见CompletableFuture的优点是:异步任务结束时,会自动回调某个对象的方法;异步任务出错时,会自动回调某个对象的方法;主线程设置好回调后,不再关心异步任务的执行。

如果只是实现了异步回调机制,我们还看不出CompletableFuture相比Future的优势。CompletableFuture更强大的功能是,多个CompletableFuture可以串行执行,例如,定义两个CompletableFuture,第一个CompletableFuture根据证券名称查询证券代码,第二个CompletableFuture根据证券代码查询证券价格,这两个CompletableFuture实现串行操作如下:

import java.util.concurrent.CompletableFuture;

public class CompletableFutureDemo {
    public static void main(String[] args) throws Exception {
        // 第一个任务:
        CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油");
        });
        // cfQuery成功后继续执行下一个任务:
        CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice(code);
        });
        // cfFetch成功后打印结果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        Thread.sleep(2000);
    }

    static String queryCode(String name) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        return "601857";
    }

    static Double fetchPrice(String code) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
        }
        return 5 + Math.random() * 20;
    }
}

除了串行执行外,多个CompletableFuture还可以并行执行。例如,我们考虑这样的场景:同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:

// CompletableFuture
import java.util.concurrent.CompletableFuture;
public class Main {
    public static void main(String[] args) throws Exception {
        // 两个CompletableFuture执行异步查询:
        CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油", "https://finance.sina.com.cn/code/");
        });
        CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
            return queryCode("中国石油", "https://money.163.com/code/");
        });

        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);

        // 两个CompletableFuture执行异步查询:
        CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
        });
        CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
            return fetchPrice((String) code, "https://money.163.com/price/");
        });

        // 用anyOf合并为一个新的CompletableFuture:
        CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);

        // 最终结果:
        cfFetch.thenAccept((result) -> {
            System.out.println("price: " + result);
        });
        // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
        Thread.sleep(200);
    }

    static String queryCode(String name, String url) {
        System.out.println("query code from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
        }
        return "601857";
    }

    static Double fetchPrice(String code, String url) {
        System.out.println("query price from " + url + "...");
        try {
            Thread.sleep((long) (Math.random() * 100));
        } catch (InterruptedException e) {
        }
        return 5 + Math.random() * 20;
    }
}

上述逻辑实现的异步查询规则实际上是:

┌─────────────┐ ┌─────────────┐
│ Query Code  │ │ Query Code  │
│  from sina  │ │  from 163   │
└─────────────┘ └─────────────┘
       │               │
       └───────┬───────┘
               ▼
        ┌─────────────┐
        │    anyOf    │
        └─────────────┘
               │
       ┌───────┴────────┐
       ▼                ▼
┌─────────────┐  ┌─────────────┐
│ Query Price │  │ Query Price │
│  from sina  │  │  from 163   │
└─────────────┘  └─────────────┘
       │                │
       └────────┬───────┘
                ▼
         ┌─────────────┐
         │    anyOf    │
         └─────────────┘
                │
                ▼
         ┌─────────────┐
         │Display Price│
         └─────────────┘

除了anyOf()可以实现“任意个CompletableFuture只要一个成功”,allOf()可以实现“所有CompletableFuture都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。

最后我们注意CompletableFuture的命名规则:xxx():表示该方法将继续在已有的线程中执行;xxxAsync():表示将异步在线程池中执行。

小结:CompletableFuture可以指定异步处理流程:thenAccept()处理正常结果;
exceptional()处理异常结果;thenApplyAsync()用于串行化另一个CompletableFuture;anyOf()和allOf()用于并行化多个CompletableFuture。

参考文章:

https://www.cnblogs.com/tuojunjie/p/7390700.html
https://blog.csdn.net/weixin_34289454/article/details/92013952
https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650

上一篇下一篇

猜你喜欢

热点阅读