RxJava

异步编程神器RxJava

2018-08-26  本文已影响42人  魔镜的技术心经

定义

RxJava extends the observer pattern to support sequences of data/events and adds operations that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronisation, thread-safety and concurrent data structures.

RxJava简单来说是建立在观察者模式上面的编程范式,它一定程度抽象和屏蔽了低层的线程创建管理和多线程并发安全问题,从而提供更好的编程体验,降低了开发者使用门槛。

一些概念

The ability to store a function as a variable and pass that function as a parameter.

import java.util.function.Function;

public class FirstClassCitizenParameterIllustration {

    public static void main(String[] args) {

        Function<String, String> transformToLower = (s) -> {
            return s.toLowerCase();
        };
        System.out.println(concatAndTransform("Hello ", "World", transformToLower));
    }

    public static String concatAndTransform(String a, String b, Function<String, String> stringTransform) {

        if (stringTransform != null) {
            a = stringTransform.apply(a);
            b = stringTransform.apply(b);
        }

        return a + b;
    }
}

高阶函数, 其定义如下:

A high order function is a function that can return a function.

import java.util.function.Function;
import java.util.function.Supplier;

public class HighOrderFunctionIllustration {

    public static void main(String[] args) {

        Supplier<String> xformOperation = createCombineAndTransform("Hello ", "World", (a) -> {
            return a.toUpperCase();
        });
        System.out.println(xformOperation.get());
    }

    public static Supplier<String> createCombineAndTransform(
            final String a, final String b,
            final Function<String, String> transformer
    ) {
        return () -> {

            String aa = a;
            String bb = b;

            if (transformer != null) {
                aa = transformer.apply(a);
                bb = transformer.apply(b);
            }

            return aa + bb;
        };
    }
}

常见的操作

rxjava.png

响应时间的优化

在进行多个API调用编排的时候,如果采用同步阻塞的编排方式,其响应时间约等于所有API执行时间之和:


同步编排.png

如果是采用异步并发的方式,则响应时间会明显缩短:


异步编排1.png

在我们真实的项目中,大量采用Rxjava的响应式编程,用于服务异步编排,减少服务响应时间,提供用户体验。

举个例子

需求说明:

一个订单服务,它的数据需要聚合其他三个服务的数据,服务调用没有先后之分,没有相互依赖,调用过程中,如果任何一个服务出现问题,需要及时在响应体中加入对应的错误信息并返回。

在采用RxJava进行编程的时候,首选应该考虑Stream, 每个服务的返回可以是一个Stream(Observable),但是每个服务的返回都不一样, 不能将所有的三个服务直接通过Stream连起来。

所以第一步是提取并封装公共的返回对象:

public final class AsyncResult<V> {
    private final V value;
    private final Exception exception;

    private AsyncResult(V value, Exception exception) {
        this.value = value;
        this.exception = exception;
    }

    public static <V> AsyncResult<V> success(V value) {
        return new AsyncResult<>(value, null);
    }

    public static <V> AsyncResult<V> failed(Exception exception) {
        return new AsyncResult<>(null, exception);
    }

    public V getValue() throws Exception {
        if (exception != null) {
            throw exception;
        }
        return value;
    }

    public Exception getException() {
        return exception;
    }

    public boolean hasException() {
        return exception != null;
    }
}

第二步,为每个服务调用创建对应的Observable Stream

private Observable<AsyncResult<UserModel>> getUserAsyncStream(String userId) {
    return Observable.create((ObservableOnSubscribe<AsyncResult<UserModel>>) emitter -> {
        try {
            UserModel user = userDao.getUser(userId);
            emitter.onNext(AsyncResult.success(user));
        } catch (Exception exception) {
            emitter.onNext(AsyncResult.failed(exception));
        }
        emitter.onComplete();
    }).subscribeOn(Schedulers.io());
}

第三步,通过Stream将所有的服务进行聚合:

Observable<OrderContainer> orderContainer = Observable.just(new OrderContainer())
                .zipWith(userStream, new UserAssembler())
                .zipWith(logisticsStream, new LogisticsAssembler())
                .zipWith(productStream, new ProductAssembler())
                .subscribeOn(Schedulers.io());
                
return orderContainer.blockingGet(); 

其中,每个Assember作为具体的聚合逻辑处理器, 分别针对正常情况和异常情况进行处理。

public class UserAssembler implements BiFunction<OrderContainer, AsyncResult<UserModel>, OrderContainer> {
    @Override
    public OrderContainer apply(OrderContainer orderContainer, AsyncResult<UserModel> userModel) throws Exception {
        if (userModel.hasException()) {
            orderContainer.addErrors(ErrorBuilder.buildServiceError(userModel.getException().getMessage()));
        } else {
            orderContainer.setUser(userModel.getValue());
        }
        return orderContainer;
    }
}

整体流程如下:


image.png

其他

上一篇 下一篇

猜你喜欢

热点阅读