Java 微服务异步并行调用优化
我们先来设想一个场景。
有一个 http 的接口 A,该接口内部实际上是由另外三个接口 B、C、D 返回结果的组合,这三个接口不存在相互依赖。我们一般的写法就是 B、C、D 同步顺序执行,依次拿到结果后组装在一起。那么假如这三个接口分别耗时 2 秒,那么 A 接口就要耗时 6 秒。如果可以让 B、C、D 同时执行的话,那么 A 接口理论上只要耗时 2 秒。
当然实际情况肯定复杂的多,如果一个接口内部存在不相互依赖的耗时调用的话,那么我们可以做这样的合并,响应时间上的减少还是非常明显的。整个接口的响应时间取决于最长的那个内部接口。
那么我们来看看在 Java 中有哪些方法可以达到这样的目的。认真思考下你会发现,如果要并行处理的话,在 Java 中只能用多线程来做。实际情况中每个线程处理完的时间肯定不一样,那么如何让线程先处理完的停下来等最后那个处理完的呢。如果经常用多线程的小伙伴肯定能想到 CountDownLatch 工具类。当然也有直接简单暴力的方法,在空循环里轮询每个线程是否执行完,但是这样做肯定不优雅。
那下面就直接上代码了: 假设有个学生服务提供查询学生名字,年龄和家庭信息,每个服务之间没有相互依赖。 我们就简单模拟下来获取学生信息的一个接口。
常规方法
@RequestMapping("/getStudentInfo")
public Object getStudentInfo() {
long start = System.currentTimeMillis();
Map resultMap = new HashMap<>(10);
try {
resultMap.put("studentName", studentService.getStudentName());
resultMap.put("studentAge", studentService.getSutdentAge());
resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
顺序同步执行,耗时 6 秒。
1. Future
@RequestMapping("/getStudentInfoWithFuture")
public Object testWhitCallable() {
long start = System.currentTimeMillis();
Map resultMap = new HashMap<>(10);
try {
CountDownLatch countDownLatch = new CountDownLatch(3);
Future futureStudentName = es.submit(() -> {
Object studentName = studentService.getStudentName();
countDownLatch.countDown();
return studentName;
});
Future futureStudentAge = es.submit(() -> {
Object studentAge = studentService.getSutdentAge();
countDownLatch.countDown();
return studentAge;
});
Future futureStudentFamilyInfo = es.submit(() -> {
Object studentFamilyInfo = studentService.getSutdentFamilyInfo();
countDownLatch.countDown();
return studentFamilyInfo;
});
//同步等待所有线程执行完之后再继续
countDownLatch.await();
resultMap.put("studentName", futureStudentName.get());
resultMap.put("studentAge", futureStudentAge.get());
resultMap.put("studentFamilyInfo", futureStudentFamilyInfo.get());
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
2.RxJava
@RequestMapping("/getStudentInfoWithRxJava")
public Object testWithRxJava() {
long start = System.currentTimeMillis();
Map resultMap = new HashMap<>(10);
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
Observable studentNameObservable = Observable.create(observableEmitter -> {
resultMap.put("studentName", studentService.getStudentName());
observableEmitter.onComplete();
}).subscribeOn(Schedulers.io());
Observable studentAgeObservable = Observable.create(observableEmitter -> {
resultMap.put("studentAge", studentService.getSutdentAge());
observableEmitter.onComplete();
}).subscribeOn(Schedulers.io());
Observable familyInfoObservable = Observable.create(observableEmitter -> {
resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());
observableEmitter.onComplete();
}).subscribeOn(Schedulers.io());
//创建一个下游 Observer
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
//因为后面用了 merge 操作符,所以会合并后发射,那么只要 countdown 一次就行了。
countDownLatch.countDown();
}
};
//建立连接,
Observable.merge(studentNameObservable, studentAgeObservable, familyInfoObservable).subscribe(observer);
//等待异步线程完成
countDownLatch.await();
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
对于 RxJava 我不熟,我也是临时学习的,不知道这种写法是不是最佳的。
3.CompletableFutures
@RequestMapping("/getStudentInfoWithCompletableFuture")
public Object getStudentInfoWithCompletableFuture() {
long start = System.currentTimeMillis();
Map resultMap = new HashMap<>(10);
try {
CompletableFuture completableFutureStudentName = CompletableFuture.supplyAsync(() -> {
try {
return studentService.getStudentName();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
});
CompletableFuture completableFutureSutdentAge = CompletableFuture.supplyAsync(() -> {
try {
return studentService.getSutdentAge();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
});
CompletableFuture completableFutureFamilyInfo = CompletableFuture.supplyAsync(() -> {
try {
return studentService.getSutdentFamilyInfo();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
});
CompletableFuture.allOf(completableFutureStudentName, completableFutureSutdentAge, completableFutureFamilyInfo).join();
resultMap.put("studentName", completableFutureStudentName.get());
resultMap.put("studentAge", completableFutureSutdentAge.get());
resultMap.put("studentFamilyInfo", completableFutureFamilyInfo.get());
} catch (Exception e) {
resultMap.put("errMsg", e.getMessage());
}
resultMap.put("total cost", System.currentTimeMillis() - start);
return resultMap;
}
自带最后的同步等待,不需要 CountDownLatch。CompletableFuture 还有很多其他好用的方法。
有兴趣的可以自己来实验下。 github 项目地址 reactive-programming-sample。
Java程序员如何学习才能快速入门并精通呢?
当真正开始学习的时候难免不知道从哪入手,导致效率低下影响继续学习的信心。
但最重要的是不知道哪些技术需要重点掌握,学习时频繁踩坑,最终浪费大量时间,所以有一套实用的视频课程用来跟着学习是非常有必要的。
为了让学习变得轻松、高效,今天给大家免费分享一套阿里架构师传授的一套教学资源。帮助大家在成为架构师的道路上披荆斩棘。这套视频课程详细讲解了(Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构)等这些成为架构师必备的内容!而且还把框架需要用到的各种程序进行了打包,根据基础视频可以让你轻松搭建分布式框架环境,像在企业生产环境一样进行学习和实践。
如果想提升自己的,看看上图大纲能知道你现在还处于什么阶段要向那些方面发展?
同时小编已将上图知识大纲里面的内容打包好了......
想要资料的朋友,可以直接加群960439918获取免费架构资料(包括高可用,高并发,spring源码,mybatis源码,JVM,大数据,Netty等多个技术知识的架构视频资料和各种电子书籍阅读)
加入群聊【java高级架构交流群】