借助JDK8和RxJava如何让你的业务代码运行的更快

2018-12-07  本文已影响0人  空山雪林

背景

微服务流行后,在我们项目开发过程中,一个服务经常会调用N个微服务,调用每个微服务可能需要几百毫秒,试想,一个复杂的业务如果要调用上百的微服务,如果各个服务同步执行,可能就需要花费好几秒,试想:这些服务为什么不能并行运行呢?

一个复杂的计算任务,为什么不能分解成更小的任务单位,让他们并行运行呢?

本文通过以上两个业务场景,比较各个实现方案的差异,在讲解之前,我们先来了解下本文提到的RxJava

案例

从一段最简单的服务开始:该服务需调用3个微服务,每个微服务费时250ms,三个微服务都获取数据后返回给前端(该微服务三个服务分别是商品详情,商品评论和推荐商品列表),如果按顺序执行,那么代码是这样的:

public static void main(String[] args) throws Exception {
    long c = System.currentTimeMillis();
    System.out.println("顺序执行:");
    System.out.println(service("商品详情微服务")+service("商品评论微服务")+service("推荐商品微服务"));
    spendTime(c);
}
//模拟某个服务
private static String service(String srvName){
    try {
        Thread.sleep(250);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return srvName+"\r\n";
}
private static void spendTime(long preTime) {
    System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
}

这段代码毫无疑问,打印输出:

花费:781 毫秒

改造一下,使用JDK8的CompletableFuture,3个微服务独立线程运行,都完成后通知主线程打印,代码如下:

public static void main(String[] args) throws Exception {
        final long cc = System.currentTimeMillis(); 
    CompletableFuture<String> s1 = CompletableFuture.supplyAsync(() -> service("商品详情微服务"));
    CompletableFuture<String> s2 = CompletableFuture.supplyAsync(() -> service("商品评论微服务"));
    CompletableFuture<String> s3 = CompletableFuture.supplyAsync(() -> service("推荐商品微服务"));
    s1.thenCombine(s2, (i,j)->{
        return i+j;
    }).thenCombine(s3, (i,j)->{
        System.out.println("使用JDK8的并行编程:");
        System.out.println(i+j);
        spendTime(cc);
        return i+j;
    });
}

以上代码的执行结果取决于3个微服务中最长时间的那个服务,相比原先速度有明显提高:

花费:311 毫秒

那么以上的代码使用RxJava怎么来写呢?我们可以flatMap将服务分拆到各自独立线程中去执行,代码如下:

private static String[] ss = {"商品详情微服务","商品评论微服务","推荐商品微服务"};
public static void main(String[] args) throws Exception {
    Observable.range(0,3)
    .flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer t) throws Exception {
            return Observable.just(t)
        .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer t) throws Exception {
                        return service(ss[t]);
                    }
                });
            }
        })
        .reduce((s1,s2)->s1+s2)
        .subscribe(s -> {
            System.out.println("Observable:\r\n" + s);
            spendTime(cc2);
        });
}

花费:455 毫秒

RxJava模拟的针对每个数据项的并发操作调用时间上要比直接使用JDK8的API慢得多

第二个业务场景是将复杂的计算进行拆分子计算任务,然后将每个任务计算合并成最终计算结果,以下直接给出所有源码,我们来看看几种计算方式在耗时上的不同,复杂计算任务是:对1到210000000开根号求总和

package com.sumslack.rxjava;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

public class TestComputer {
    private static final int MAX_I = 210000000;
    
    private static void spendTime(long preTime) {
        System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
    }
    
    private static void spendTime(long preTime,String str) {
        System.out.println("[" + str + "] 花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
    }
    private static ExecutorService eService = Executors.newCachedThreadPool();
    public static void main(String[] args) throws Exception{
        
        int[] ss = new int[MAX_I];
        for(int i=1;i<=MAX_I;i++) {
            ss[i-1] = i;
        }
        
        
        long c = System.currentTimeMillis();
        System.out.println(xx(0,MAX_I));
        spendTime(c,"顺序执行");

        final long cc5 = System.currentTimeMillis();
        Observable.range(1, MAX_I).map(new Function<Integer, Double>() {
            @Override
            public Double apply(Integer t) throws Exception {
                return Math.sqrt(t);
            }
        }).reduce((i,j)->i+j)
        .subscribeOn(Schedulers.computation())
        .subscribe(s -> {
            spendTime(cc5,"Observable直接算");
        });
        final long cc = System.currentTimeMillis();
        CompletableFuture<Double> cf1 = CompletableFuture.supplyAsync(() -> {
            return xx(0,MAX_I/2);
        });
        CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
            return xx(MAX_I/2,MAX_I);
        });
        cf1.thenCombine(cf2,  (i,j)->{
            System.out.println(""+(i+j));
            spendTime(cc,"CompletableFuture");
            return i+j;
        });
               
        //也可以用:CompletableFuture.allOf(cf1,cf2).join();
        c = System.currentTimeMillis();
        Double dd = Arrays.stream(ss).mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
        System.out.println(dd);
        spendTime(cc,"stream");
        
        c = System.currentTimeMillis();
        Double dd2 = Arrays.stream(ss).parallel().mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
        System.out.println(dd2);
        spendTime(cc,"parallel stream");
        
        final long cc2 = System.currentTimeMillis();
        Observable.fromArray(0,1,2)
        .flatMap(new io.reactivex.functions.Function<Integer,ObservableSource<Double>>(){
            @Override
            public ObservableSource<Double> apply(Integer t) throws Exception {
                if(t%3==0) {
                    return Observable.just(t)
                        .subscribeOn(Schedulers.computation())
                        .map(new Function<Integer, Double>() {
                            @Override
                            public Double apply(Integer t) throws Exception {
                                return xx(0,MAX_I/3);
                            }
                        });
                }else if(t%3==1) {
                    return Observable.just(t)
                            .subscribeOn(Schedulers.computation())
                            .map(new Function<Integer, Double>() {
                                @Override
                                public Double apply(Integer t) throws Exception {
                                    return xx(MAX_I/3,MAX_I*2/3);
                                }
                            });
                }else {
                    return Observable.just(t)
                            .subscribeOn(Schedulers.computation())
                            .map(new Function<Integer, Double>() {
                                @Override
                                public Double apply(Integer t) throws Exception {
                                    return xx(MAX_I*2/3,MAX_I);
                                }
                            });
                }
            }
        })
        .reduce(new BiFunction<Double, Double, Double>() {
            @Override
            public Double apply(Double t1, Double t2) throws Exception {
                return t1+t2;
            }
        })
        .subscribe( s->{
            System.out.println(s);
            spendTime(cc2,"Observable");
        });
        Thread.sleep(100000);
    }
    
    private static double xx(int start,int end) {
        double sum = 1;
        for(int i=start;i<end;i++) {
            sum += Math.sqrt(i+1);
        }
        return sum;
    }
}

以下是费时结果:

[顺序执行] 花费:1086 毫秒
[CompletableFuture] 花费:537 毫秒
[stream] 花费:1028 毫秒
[parallel stream] 花费:1305 毫秒
[Observable] 花费:461 毫秒
[Observable直接算] 花费:4265 毫秒

这里使用 RxJava 进行计算任务分解求和是最快的,因为JDK8并发编程我们分解的是两个计算任务,而RxJava分解成3个所致!

关于RxJava

RxJavaReactive ExtensionsJava实现,通过使用Obserable/Flowable序列来构建异步和基于事件的程序的库,RxJava实现和扩展了观察者模式。

RxJava基于响应式编程,是一种面向数据流和变化传播的编程范式。传统编程方式代码都是顺序执行的,而响应式编程是基于异步编程的,借助于CPU多核能力,提高运行效率,降低延迟和阻塞,基于数据流模型,如一个函数可作用与数据流中的每项,可变化传播。在响应式编程中,函数成为其第一等公民,同原型类型一样,函数可作用与参数,也可作为返回值。

RxJava基于函数式编程,传统面向对象是通过抽象出对象关系来解决问题,函数式编程是通过函数的组合来解决问题。

概念

RxJava编程

总结

对于复杂计算,你可以将计算任务分解成N个子计算任务,交给多个线程处理并将结果合并后取得最终结果,对于服务业务的调用,你应该清楚,哪些子任务可以并行运行,哪些需要顺序执行,使用RxJava在代码上可能更加直观,也可以使用JDK8的CompletableFuture,其实JDK8的很多API参考了RxJava的实现,两者在写法上非常的类似,响应式编程相比传统代码的顺序执行在思路上有很大的不同,理解上也有一定的难度,希望通过本文让您全面了解函数式编程的实现思路。

上一篇下一篇

猜你喜欢

热点阅读