RxjavaRxJava我爱编程

动脑学院Rxjava预习资料 RxJava2 响应式编程介绍

2018-05-25  本文已影响593人  十年开发程序员

RxJava2 响应式编程介绍

响应式编程

基本概念

响应式宣言

响应式宣言,Reactive Manifesto: 来自不同领域的组织正在不约而同地发现一些看起来如出一辙的软件构建模式。它们的系统更加稳健,更加有可回复性,更加灵活,并且以更好的定位来满足现代的需求。

响应式宣言针对一个系统而言,并不等同于 响应式编程规范,响应式系统应该满足如下特点:

QQ截图20180525165803.png

响应式扩展

响应式流规范

示例

    Path filePath = Paths.get("build.gradle");
    // RxJava2 to Reactor
    Flowable<String> flowable = Flowable
            .fromCallable(() -> Files.readAllLines(filePath))
            .flatMap(x -> Flowable.fromIterable(x));
    Flux.from(flowable).count().subscribe(System.out::println);

    // Reactor to RxJava2
    try
    {
        Flux<String> flux = Flux.fromIterable(Files.readAllLines(filePath));
        Flowable.fromPublisher(flux).count()
                .subscribe(System.out::println);
    }
    catch (IOException e)
    {
        e.printStackTrace();
    }

RxJava 基础

RxJava现状

为什么选择 RxJava2

与其它编程模式/库相比

单个数据 多个数据
同步 T getData() Iterable<T> getData()
异步 Future<T> getData() Observable<T> getData()

RxJava 1 vs RxJava 2

    Observable.just(null);
    Single.just(null);
    Flowable.just(null);
    Maybe.just(null);
    Observable.fromCallable(() -> null)
            .subscribe(System.out::println, Throwable::printStackTrace);
    Observable.just(1).map(v -> null)
            .subscribe(System.out::println, Throwable::printStackTrace);

RxJava2中的响应式类

RxJava2 主要类关系图

如下图所示,为RxJava2中的主要类关系图,可清晰知道各响应式类的联系和区别。后面无特别说明均以Flowable说明。 Publisher-Subscriber-class-relation.png

Flowable & Observable

Single & Completable & Maybe

interface SingleObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
}
    Completable.create(new CompletableOnSubscribe()
    {
        @Override
        public void subscribe(CompletableEmitter e) throws Exception
        {
            Path filePath = Paths.get("build.gradle");
            Files.readAllLines(filePath);
            e.onComplete();
        }
    }).subscribe(() -> System.out.println("OK!"),
            Throwable::printStackTrace);
Maybe.just(1)
        .map(v -> v + 1)
        .filter(v -> v == 1)
        .defaultIfEmpty(2)
        .test()
        .assertResult(21);
//        java.lang.AssertionError: Values at position 0 differ; Expected: 21 (class: Integer), Actual: 2 (class: Integer) (latch = 0, values = 1, errors = 0, completions = 1)
//
//        at io.reactivex.observers.BaseTestConsumer.fail(BaseTestConsumer.java:133)
//        ....

RxJava2 的主要操作

我们已经知道 RxJava主要特性为为一个扩展的观察者模式、流式操作和异步编程,支持ReactiveX 规范给出的一些操作, 同时RxJava2 符合响应式流规范,接下来以Flowable为例,按照功能分类讲解RxJava2中的重要操作[9];

创建一个Flowable

    List<String> list = Arrays.asList(
            "blue", "red", "green", "yellow", "orange", "cyan", "purple"
    );
    Flowable.fromIterable(list).skip(2).subscribe(System.out::println);
    Flowable.fromArray(list.toArray()).subscribe(System.out::println);
    Flowable.just("blue").subscribe(System.out::println);
    ExecutorService executor = Executors.newFixedThreadPool(2);
    System.out.println("MAIN: " + Thread.currentThread().getId());
    Callable<String> callable = () -> {
        System.out.println("callable [" + Thread.currentThread().getId() + "]: ");
        Path filePath = Paths.get("build.gradle");
        return Files.readAllLines(filePath).stream().flatMap(s -> Arrays.stream(s.split
                (""))).count() + "";
    };

    Future<String> future = executor.submit(callable);

    Consumer<String> onNext = v -> System.out
            .println("consumer[" + Thread.currentThread().getId() + "]:" + v);

    Flowable.fromCallable(callable).subscribe(onNext);
    Flowable.fromFuture(future).subscribe(onNext);
    System.out.println("END");
    class Fib
    {
        long a;
        long b;

        public Fib(long a, long b)
        {
            this.a = a;
            this.b = b;
        }

        public long fib()
        {
            return a + b;
        }
    }

    //斐波那契数列
    Flowable.create(new FlowableOnSubscribe<Fib>()
    {
        @Override
        public void subscribe(FlowableEmitter<Fib> e) throws Exception
        {
            Fib start = new Fib(1L, 1L);

            while (!e.isCancelled()) {
                e.onNext(start);
                start = new Fib(start.b, start.fib());
            }
            e.onComplete();
        }
    }, BackpressureStrategy.BUFFER).map(x -> x.fib()).take(10).subscribe(System.out::println);

    Flowable.generate(() -> new Fib(1L, 1L), (x, y) -> {
        Fib fib = new Fib(x.b, x.fib());
        y.onNext(fib);
        return fib;
    }).ofType(Fib.class).map(x -> x.fib()).take(10).subscribe(System.out::println);
    Flowable<String> f1 = Flowable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS).map(index -> "f1-" + index);
    Flowable<String> f2 = Flowable.intervalRange(1, 3, 2, 2, TimeUnit.SECONDS).map(index -> "f2-" + index);

    Flowable.ambArray(f1, f2).map(x -> "amb: " + x).subscribe(System.out::println);
    System.out.println("----------concat-----------");
    Flowable.concat(f1, f2).map(x -> "concat: " + x).subscribe(System.out::println);

    System.out.println("----------merge-----------");
    Flowable.merge(f1, f2).map(x -> "merge: " + x).subscribe(System.out::println);

    Flowable<String>[] flowables = new Flowable[]{f1, f2};
    Flowable.switchOnNext(Flowable.intervalRange(0, 2, 0, 3, TimeUnit.SECONDS).map(i -> flowables[i.intValue()]))
            .map(x -> "switchOnNext-" + x).subscribe(System.out::println);
    Flowable.intervalRange(0, 2, 0, 3, TimeUnit.SECONDS).map(i -> flowables[i.intValue()])
            .switchMap((io.reactivex.functions.Function) Functions.identity())
            .map(x -> "switchMap-" + x).subscribe(System.out::println);

转换、过滤与聚合操作

在Java8中Stream也有包含这些功能的操作,由于多了时间这个维度,在 RxJava 中操作相对更加丰富。 这里主要介绍一些重点操作。

    Flowable<String> f1 = Flowable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS).delay((t) ->
            Flowable.timer(t % 3 + new Random().nextLong() % 3, TimeUnit.SECONDS))
            .map(index -> index % 3 + "-f1-" + index);
    f1.buffer(5, TimeUnit.SECONDS).map(x -> "buffer-" + x).subscribe(System.out::println);

    f1.window(5, TimeUnit.SECONDS).map(x -> x.toList())
            .subscribe(x -> x.subscribe(System.out::println));

    Disposable b = f1.groupBy((x) -> x.split("-", 2)[0])
            .subscribe(x -> x.toList().subscribe(System.out::println));
    Map<String, List<String>> map = f1.toList().blockingGet().stream()
                .collect(Collectors.groupingBy((x) -> x.split
                        ("-", 2)[0]));
    System.out.println(map);

    while (!b.isDisposed()) {
    }
debounce 防抖动,元素发射后在设定的超时时间内没有其它元素发射,则将此元素用于后续处理, 在前端APP应用较多。 如果是空间上的防抖动可以利用distinctUntilChanged操作符。 debounce throttle 限流操作,对于 throttleFirst是 取发射后元素,经过间隔时间后的第一个元素进行发射。 throttleFirst sample 数据采样, 对于源数据,发射间隔时间内的最后出现的元素。 sample
        Flowable<String> f1 = Flowable
                .fromArray("blue", "red", "green", "yellow11", "orange", "cyan", "purple"
                );

        f1.elementAt(4, "hello").subscribe(System.out::println);
        //out: orange
        f1.takeUntil(x -> x.length() > 5).map(x -> "takeUntil-" + x).toList()
                .subscribe(System.out::println);
        //out: [takeUntil-blue, takeUntil-red, takeUntil-green, takeUntil-yellow11]
        f1.takeWhile(x -> x.length() <= 5).map(x -> "takeWhile-" + x).toList()
                .subscribe(System.out::println);
        //out: [takeWhile-blue, takeWhile-red, takeWhile-green]

        f1.skipWhile(x -> x.length() <= 5).map(x -> "skipWhile-" + x).toList()
                .subscribe(System.out::println);
        //[skipWhile-yellow11, skipWhile-orange, skipWhile-cyan, skipWhile-purple]

        Disposable d = f1.delay(v -> Flowable.timer(v.length(), TimeUnit.SECONDS))
                .skipUntil(Flowable.timer(5, TimeUnit.SECONDS)).map(x -> "skipUntil-" + x)
                .subscribe(System.out::println);
//        skipUntil-green
//        skipUntil-orange
//        skipUntil-purple
//        skipUntil-yellow11
        while (!d.isDisposed()) {
        }

异步与并发(Asynchronized & Concurrency)

RxJava 通过一些操作统一了 同步和异步,阻塞与非阻塞,并行与并发编程。

observeOn & subscribeOn & Scheduler

多线程并发示例

上小节给出示例 发射元素都会经过同样的线程切换,元素间不会产生并行执行的效果。 如果需要达到 类似 Java8 parallel 执行效果。可以采用FlatMap 变换 自定义并发操作,在返回的Flowable进行线程操作,如下示例所示:

    f1.filter(Files::isRegularFile).doOnNext(consumer).subscribeOn(Schedulers.newThread())
            .flatMap(y -> Flowable.just(y).subscribeOn(Schedulers.io())
                    .map(Files::readAllLines)).map(Collection::size)
            .observeOn(Schedulers.computation()).doOnNext(consumer)
            .sorted(Comparator.naturalOrder())
            .observeOn(Schedulers.trampoline()).subscribe(consumer);

阻塞与非阻塞示例

    Flowable<Path> f2 = f1.subscribeOn(Schedulers.newThread());
    //f2 为非阻塞flowable
    // 可以通过 blockingSubscribe 变为在主线程上消费
    f2.blockingSubscribe(System.out::println);
    // 也可以通过下面操作返回结果。
    List<Path> list = f2.toList().blockingGet();
    Iterable<Path> iterator = f2.blockingIterable();

错误处理 (error handling)

    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS).map(index -> {
        throw new IOException(index + "");
    }).map(index -> {
        throw new IllegalArgumentException(index + "");
    });
    Disposable d = f1.subscribe(System.out::println, Throwable::printStackTrace);
    while (!d.isDisposed()) {
    }
    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS).map(index -> {
        throw new IOException(index + "");
    }).map(index -> {
        throw new IllegalArgumentException(index + "");
    });
    f1.onErrorReturnItem(-1L).take(5)
            .subscribe(System.out::println, Throwable::printStackTrace);
    // 打印 -1 
    Disposable d = f1.onErrorResumeNext(e -> {
        if (e instanceof IOException) {
            return Flowable.error(new UncheckedIOException((IOException) e));
        }
        return Flowable.error(e);
    }).subscribe(System.out::println, Throwable::printStackTrace);
    // 打印 UncheckedIOException 异常
    while (!d.isDisposed()) {
    }
    Function<Long, Long> exceptionMap = x -> {
        if (new Random().nextInt(5) > 2) {
            throw new IOException(x + "");
        }
        return x;
    };

    // 使用flatMap + onErrorReturnItem
    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS);
    f1.flatMap(index -> Flowable.just(index).map(exceptionMap).onErrorReturnItem(-1L))
    .take(5).subscribe(System.out::println);

   //直接封装lift 操作
   public class ErrorResumeOperator<D, U> implements FlowableOperator<D, U>
    {
        private final Function<U, D> function;
        private final D defaultValue;

        public ErrorResumeOperator(Function<U, D> function, D defaultValue)
        {
            this.function = function;
            this.defaultValue = defaultValue;
        }

        @Override
        public Subscriber<? super U> apply(Subscriber<? super D> observer) throws Exception
        {
            Subscriber<U> subscriber = new Subscriber<U>()
            {
                @Override
                public void onSubscribe(Subscription s)
                {
                    observer.onSubscribe(s);
                }

                @Override
                public void onNext(U onNext)
                {
                    try {
                        observer.onNext(function.apply(onNext));
                    }
                    catch (Exception e) {
                        observer.onNext(defaultValue);
                    }
                }

                @Override
                public void onError(Throwable t)
                {
                    observer.onError(t);
                }

                @Override
                public void onComplete()
                {
                    observer.onComplete();
                }
            };
            return subscriber;
        }
    }
    Disposable d = f1.lift(new ErrorResumeOperator<>(exceptionMap, -1L)).take(5)
            .subscribe(System.out::println);

    while (!d.isDisposed()) {
    }

    Function<Long, Long> exceptionMap = x -> {
        if (new Random().nextInt(5) > 3) {
            throw new IOException(x + "");
        }
        if (new Random().nextInt(6) < 1) {
            throw new SQLException(x + "");
        }
        return x;
    };
    Flowable<Long> f1 = Flowable.interval(500, TimeUnit.MILLISECONDS);
// 仅为 IOException 异常时最多重试3次,其它异常立即打印异常
    Disposable d = f1.map(exceptionMap).retry(3, e -> e instanceof IOException)
            .subscribe(System.out::println, Throwable::printStackTrace);
    while (!d.isDisposed()) {
    }

冷热数据流

ConnectableFlowable & publish & connect

publishConnect

;

    ConnectableFlowable<String> f1 = Flowable.generate(() -> new BufferedReader(new InputStreamReader(System.in))
            , (reader, e) -> {
                while (true) {
                    String line = reader.readLine();
                    if (line == null || line.equalsIgnoreCase("exit")) {
                        break;
                    }
                    e.onNext(line);
                }
                e.onComplete();
            }).ofType(String.class).subscribeOn(Schedulers.io()).publish();

    TimeUnit.SECONDS.sleep(5);
    f1.connect(System.out::println);
    TimeUnit.SECONDS.sleep(5);
    f1.observeOn(Schedulers.newThread()).map(x -> "s0- " + x).subscribe(System.out::println);
    TimeUnit.SECONDS.sleep(5);
    f1.map(x -> "s1- " + x).subscribe(System.out::println);
    TimeUnit.SECONDS.sleep(50);

replay

replay 将Flowable变成 ConnectableFlowable, 在connect之后,确保每次消费都使用相同数据。

    java.util.function.Function<String, Consumer<Object>> m = s -> v -> System.out
            .println("[" + System.currentTimeMillis() / 100 + "] " + s + "-" + v);
    ConnectableFlowable<Long> f1 = Flowable.intervalRange(1, 100, 0, 1, TimeUnit.SECONDS)
            .onBackpressureBuffer().replay();
    m.apply("").accept("start");
    TimeUnit.SECONDS.sleep(5);
    f1.connect();
    TimeUnit.SECONDS.sleep(5);
    f1.subscribe(m.apply("o1"));

    TimeUnit.SECONDS.sleep(5);
    f1.subscribe(m.apply("o2"));
    TimeUnit.SECONDS.sleep(20);

cache

缓存功能,将Flowable进行缓存

    java.util.function.Function<String, Consumer<Object>> m = s -> v -> System.out
            .println("[" + System.currentTimeMillis() / 100 + "] " + s + "-" + v);

    Flowable<Path> f1 = Flowable.create((FlowableEmitter<Path> e) -> {
        Path dir = Paths.get("/home/clouder/berk/workspaces/cattle").toRealPath();
        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(dir)) {
            Iterator<Path> iter = dirStream.iterator();
            while (iter.hasNext() && !e.isCancelled()) {
                Path path = iter.next();
                m.apply("-----create").accept(path);
                e.onNext(path);
            }
            e.onComplete();
        }
    }, BackpressureStrategy.BUFFER).cache();

    f1.count().subscribe(m.apply("count"));
    f1.filter(Files::isDirectory).subscribe(m.apply("filter"));

背压(Backpressure)问题

问题描述: 在rxjava中会经常遇到一种情况就是被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息。那么随之而来的就是如何处理这些未处理的消息。

如下示例: f1 比 f2 元素发射速度快一倍。而zip是按照发射顺序结合,所以出现f1的产生速度快于其消费速度,因此会有背压问题产生(当发射到一定数量时会有异常抛出)。

    Consumer<Object> consumer = v -> System.out.println("[" + System.currentTimeMillis() / 100 + "] " + v);
    Flowable<Long> f1 = Flowable.interval(100, TimeUnit.MILLISECONDS);
    Flowable<Long> f2 = Flowable.interval(200, TimeUnit.MILLISECONDS);

    Flowable<Long> f3 = Flowable.zip(f1, f2, (x, y) -> x * 10000 + y);
    f3.subscribe(consumer);

对于出现的背压问题: - Flowable默认队列大小为128,并且规范要求,所有的操作符强制支持背压。 - 通过操作节流(Throttling)相关操作(sample 、throttleLast、throttleFirst、throttleWithTimeout、debounce等)来改变Flowable的发射数率;

RxJava 测试

RxJava2 支持test() 操作符,将Flowable转变为 TestSubscriber,从而支持多种断言操作。

    List<String> list = Arrays.asList(
            "orange", "blue", "red", "green", "yellow", "cyan", "purple");

    Flowable.fromIterable(list).subscribeOn(Schedulers.newThread()).sorted().test().assertValues(list.stream().sorted().toArray(String[]::new));
    Flowable.fromIterable(list).count().test().assertValue(Integer.valueOf(list.size()).longValue());
    List<String> out1 = Flowable.fromIterable(list).sorted().test().values();

Reference

  1. 响应式宣言.https://github.com/reactivemanifesto/reactivemanifesto/blob/master/README.zh-cn.md

  2. RxJava 2.0 Released with Support for Reactive Streams Specification. https://www.infoq.com/news/2016/11/rxjava-2-with-reactive-streams

  3. https://www.lightbend.com/blog/7-ways-washing-dishes-and-message-driven-reactive-systems

  4. Use reactive streams API to combine akka-streams with rxJava. http://www.smartjava.org/content/use-reactive-streams-api-combine-akka-streams-rxjava

  5. What's different in 2.0. https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0

  6. Learning Reactive Programming with Java8. https://github.com/zouzhberk/rxjava-study/raw/master/docs/LearningReactiveProgramming.pdf

示例代码位于: https://github.com/zouzhberk/rxjava-study

上一篇下一篇

猜你喜欢

热点阅读