AndroidAndroid知识Android技术知识

RxJava(2.0)-你可能需要知道这些

2017-09-22  本文已影响0人  24K男

作为一个Android开发从业者,当你处理异步任务时,如果还在使用着Handler+Thread,那么你可能需要了解下RxJava这个优秀的开源框架;当然如果你正在跳槽面试,RxJava也是经常被问到的框架。
关于介绍RxJava的文章也非常多,但是很多文章基于的版本还是1.0.X,而本博文就基于2.0版本对RxJava进行一个简单的介绍和分析,也算是抛砖引玉吧。
本博文基于RxJava 2.0.0版本进行分析讲解。
参考:抛物线大神《给 Android 开发者的 RxJava 详解》

RxJava是什么?

简单的归纳为两个字:异步

归纳毕竟是归纳,不能完全表明RxJava的概念,那么我们来看GitHub上给出的解释:
a library for composing asynchronous and event-based programs by using observable sequences.

我用我蹩脚的CET-6水平给大家翻译下,大概就是这个意思:
一个使用可观测序列来组成异步的、基于事件的程序的库。

这对于刚接触的童鞋们可能不太容易理解,RxJava的核心还是异步,其他的定语都是基于其之上,有了这个思维和认识,再去学习RxJava也能更容易接受和理解其设计。


为什么要使用RxJava?

我写溜溜的[AsyncTask / Handler / Thread/ ... ],干嘛要使用这个奇怪的RxJava啊?

还能为什么?简洁呗

异步操作的很重要的一点就是保持程序和代码的简洁性,Android内部提供的AsyncTask以及Handlder+Thread都是为了解决异步代码编写繁琐问题,从而使编写异步代码更加简洁。在保持代码和程序简洁这个目的上,RxJava倒是更加的努力和方便,它的优点是随着程序逻辑变得越来越复杂,它仍然可以保持简洁、优雅

口说无凭,我们来分析下面这样一个例子。

图片展示可能是我们每个Android开发者都要面对的问题,假设在我们的Activity上存在一个ListView,并且我们提供了一个addImage方法来任意添加待显示的图片。现在需要将某个目录下所有的png图片都加载并显示在ListView中,由于读取和解析图片是一个耗时过程,因此我们需要将这个过程放在后台执行;而图片的显示则必须放在主线程(UI线程)中。

那么在没有使用RxJava时,我们怎么编写这段代码呢?

    new Thread() {
            @Override
            public void run() {
                super.run();
                for (File folder : folders) {
                    File[] files = folder.listFiles();
                    for (File file : files) {
                        if (file.getName().endsWith(".png")) {
                            final Bitmap bitmap = getBitmapFromFile(file);
                            ((MainActivity) context).runOnUiThread(new Runnable() {
                                @Override
                                public void run() {
                                    imageList.add(bitmap);
                                    imageListAdatper.notifyDataSetChanged();
                                }

                            });

                        }
                    }
                }
            }
        }.start();

没有对比,就没有伤害,如果我们使用RxJava的话,是如何实现的呢?


Observable.fromArray(folders)
                .flatMap(new Function<File, ObservableSource<File>>() {
                    @Override
                    public ObservableSource<File> apply(File file) throws Exception {
                        return Observable.fromArray(file.listFiles());
                    }
                })
                .filter(new Predicate<File>() {
                    @Override
                    public boolean test(File file) throws Exception {
                        return file.getName().endsWith(".png");
                    }
                })
                .map(new Function<File, Bitmap>() {

                    @Override
                    public Bitmap apply(File file) throws Exception {
                        return getBitmapFromFile(file);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        imageList.add(bitmap);
                        imageListAdatper.notifyDataSetChanged();
                    }
                });

这代码变简洁了吗?这代码量也没减少啊,而且这一大堆代码都是什么意思啊?完全看不懂啊。

各位看官,你先消消气,我们讲的简洁是:逻辑上的简洁,并不是单纯的代码减少(说实话,我们其实更关注这个)。

仔细看下这段代码,之前的if..else呢?之前的那么多循环呢?好像都不见了,完全是从上到下的一条链式调用,而且没有嵌套(你是不是也讨厌好多层的嵌套,反正我是),现在看起来是不是逻辑更加清楚了呢。

此时RxJava的优势还不能完全体现出来,而且看到这么多陌生的函数,你也一定有点不知其解,那么我们就带着疑惑接着往下看。

API

虽然我知道你有很强的理解和学习能力,但是我还是决定要对RxJava的一些常用的API进行介绍和说明,以便你能更顺畅的阅读全文。

1.观察者模式

RxJava的异步实现,是通过一种扩展的观察者模式来实现的。

我们来看下什么是观察者模式?

观察者模式(有时又被称为发布(publish )-订阅(Subscribe)模式、模型-视图(View)模式、源-收听者(Listener)模式或从属者模式)是软件设计模式的一种。在此种模式中,一个目标物件管理所有相依于它的观察者物件,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实现事件处理系统。

这是百度给出的解释,我们在日常编码中使用的点击事件的处理就采用了观察者模式。

clkBtn.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                Toast.makeText(MainActivity.this, "The button was clicked", Toast.LENGTH_LONG).show();

            }
        });

在典型的Click事件处理中,Button就是被观察者,而我们设置的OnClickListener就是观察者,在我们点击Button时,OnClickListener的onClick方法就会被回调。

2. RxJava的观察者模式

2.1 几个对象

我们先来了解下RxJava给我们提供的几个常用的对象。

在2.0版本中被观察者新的实现叫做Flowable, 同时旧的Observable也保留了。因为在 RxJava1.x 中,有很多事件不被能正确的背压,从而抛出MissingBackpressureException。

举个简单的例子,在 RxJava1.x 中的 observeOn, 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize 大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出MissingBackpressureException, 初学者很难明白为什么会这样,使得学习曲线异常得陡峭。

而在 2.0 中,Observable 不再支持背压,而Flowable 支持非阻塞式的背压。并且规范要求,所有的操作符强制支持背压。

幸运的是,Flowable 中的操作符大多与旧有的 Observable 类似。

Observer就是我们前面提到的观察者,与Observable组合使用。

Subscriber也被成为订阅者,一般与Flowable组合使用。

因为Observable不再支持背压,因此如果我们使用RxJava2.0版本,Flowable可能是你的不二人选。

基于以上的分析,本文以下的示例将采用Flowable进行说明和讲解。

2.2 回调

为什么称RxJava采用了扩展的观察者模式呢?我们知道传统的观察者回调接口中只有一个update方法,那么RxJava呢?它可不止一个,让我们来看下Subscriber的定义。

public interface Subscriber<T> {
    /**
     * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
     * <p>
     * No data will start flowing until {@link Subscription#request(long)} is invoked.
     * <p>
     * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
     * <p>
     * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
     * 
     * @param s
     *            {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
     */
    public void onSubscribe(Subscription s);

    /**
     * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(long)}.
     * 
     * @param t the element signaled
     */
    public void onNext(T t);

    /**
     * Failed terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     *
     * @param t the throwable signaled
     */
    public void onError(Throwable t);

    /**
     * Successful terminal state.
     * <p>
     * No further events will be sent even if {@link Subscription#request(long)} is invoked again.
     */
    public void onComplete();
}

RxJava的观察者接口中提供了onSubscribe、onNext、onError、onComplete四个回调方法,而传统的观察者模式中只有update一个回调方法,这也是称之为扩展的观察者模式的一部分原因。
下面我们来分析下Subscriber接口中几个方法:

  1. onSubscribe
    这个方法是2.0之后才有的方法,主要是给观察者提供了一个终止事件接收的机会(当然我们也可以做一些预处理),它也会首先被调用。
    要终止接收事件,可以调用Subscription的cancel方法。

  2. onNext
    我们可以将其理解为传统观察者模式回调接口中的update方法,它可能会被调用多次。它的调用顺序在onSubscribe之后。

  3. onError
    在事件处理过程中出异常时,onError会被触发,同时事件队列自动终止,不会再有事件发出。

  4. onComplete
    在事件队列传递完毕后,该方法会被调用。
    在一个正确运行的事件序列中, onComplete()和onError()有且只有一个,并且是事件序列中的最后一个。
    需要注意的是,onComplete() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

在一个正确的事件序列中,onError与onComplete互斥且唯一。

相比于传统的观察者模式,RxJava使用的扩展观察者模式好像变得复杂了,但是从另一方面来讲它也更加的丰富了,把更多的主动权和机会交给了使用者。

3. 实战

看了那么多的概念,是不是觉得有点枯燥和乏味呢,那我们就开始动手使用RxJava来体验一下吧。

3.1 引用

怎么在我们的项目中使用RxJava和RxAndroid呢?

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex.rxjava2:rxjava:2.1.3'

3.2 实例

  1. 1.0 方式
        //定义被观察者
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("Hello");
                e.onNext("World");
                e.onNext("!");
                //注意在此调用onComplete方法结束事件的处理
                e.onComplete();
            }
        });


        // 定义观察者
        Observer<String> observer = new Observer<String>() {

            // 该方法会在onNext方法之前调用
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe->11111");

                // d.dispose();
            }

            @Override
            public void onNext(String value) {
                System.out.println(value);

            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();

            }

            @Override
            public void onComplete() {
                System.out.println("onComplete->222222");
            }
        };

        // 订阅
        observable.subscribe(observer);
        

  1. 2.0方式
        //创建Flowable对象
        Flowable flowable = Flowable.create(new FlowableOnSubscribe() {
            @Override
            public void subscribe(@NonNull FlowableEmitter e) throws Exception {
                e.onNext("Hello");
                e.onNext("World");
                e.onNext("!");
                //注意在此调用onComplete方法结束事件的处理
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER);

        // 定义观察者
        Subscriber subsrciber= new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                System.out.println("onSubscribe->11111");
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete->222222");
            }
        };

        // 订阅
        flowable.subscribe(subsrciber);


  1. 订阅

订阅这句代码看起来好奇怪,主要是subscribe()这个方法有点怪:它看起来是『observalbe 订阅了 observer / subscriber』而不是『observer / subscriber 订阅了 observalbe』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭,不过如果把 API 设计成 observer.subscribe(observable) / subscriber.subscribe(observable) ,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。

  1. 运行结果

分别运行上面的两段代码,运行效果相同,如下所示:

onSubscribe->11111
Hello
World
!
onComplete->222222

这可能是最简单的RxJava使用示例了。

3.3 创建被观察者

在上面的示例中,我们采用了Observable.create方法来创建被观察者,并且在subscribe方法中完成了事件的传递。
RxJava 还提供了一些方法用来快捷创建事件队列,我们一起来看一下。

  1. just(T...)

将传递的参数,依次发送出去。

Flowable.just("Hello", "World", "!")
// 将会依次调用:
// onNext("Hello");
// onNext("World");
// onNext("!");
// onComplete();

这句代码的效果与上面示例中的效果相同。

  1. from(T[]) / from(Iterable<? extends T>)
    将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
String[] values = new String[]{"Hello", "Wrold", "!"};

Flowable observable = Flowable.fromArray(values);
// 将会依次调用:
// onNext("Hello");
// onNext("World");
// onNext("!");
// onComplete();

上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create() 的例子是等价的。

3.4 灵活的事件回调定义

RxJava支持定义不完整的事件回调定义,就是我们可以抛弃Subscriber的定义,而只选择定义其中的一部分回调。
看下代码可能会更明了。

        String[] values = new String[]{"Hello", "Wrold", "!"};
        Consumer onNext = new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                System.out.println("onNext:" + s);
            }
        };

        Consumer<? super Throwable> onError = new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                throwable.printStackTrace();
            }
        };

        Action onComplete = new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("onComplete");
            }
        };

        // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
        Flowable.fromArray(values)
                .subscribe(onNext);

        // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
        Flowable.fromArray(values)
                .subscribe(onNext, onError);

        // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
        Flowable.fromArray(values)
                .subscribe(onNext, onError, onComplete);

是不是很灵活?嗯,是的。

3.5 Schedulers

在 RxJava的默认规则中,事件的发出和消费都是在同一个线程的,在哪个线程调用subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。
也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。
观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于RxJava 是至关重要的。
而要实现异步,则需要用到 RxJava 的另一个概念: Schedulers(调度器) 。

  1. API

在RxJava中,Scheduler相当于线程控制器,RxJava通过它来指定每一段代码应该运行在什么样的线程。

RxJava已经内置了一些调度器,主要有以下几个:

有了这几个Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。

上面这段代码中,由于subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1、2、3 将会在 IO线程发出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber数字的打印将发生在主线程。

事实上,这种在 subscribe() 之前写上两句subscribeOn(Scheduler.io()) 和observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

4.变换

RxJava提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

在开发中我们经常碰到这样的场景:从本地读取并加载图片。也就是说我们通常的入参是一个文件路径,而我们想要得到的是一个BitMap对象,那么如果使用RxJava我们该如何优雅的实现呢?

    final String filePath = "/images/logo.png";

        Flowble.just(filePath)
                .map(new Function<String, Bitmap>() {

                    @Override
                    public Bitmap apply(@NonNull String s) throws Exception {
                        return getBitmapFromFile(new File(filePath));
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        showBitmap(bitmap);
                    }
                });

就问你优雅不优雅?牛逼不牛逼?

可以看到,map()方法将参数中的String对象转换成一个Bitmap对象后返回,而在经过map()方法后,事件的参数类型也由 String转为了Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。

那么常用的事件变换有那些呢?

1. map

事件对象的直接变换,具体功能上面已经介绍过,它是RxJava 最常用的变换。
在上面的例子中我们可以看到,map方法将参数中的 String对象变换为一个 Bitmap对象后返回,而在经过 map方法后,事件的参数类型也由String变为了 Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。

2. flatMap

flatMap和map有共同点,都是将一个对象转换为另一个对象,不同的是map只是一对一的转换,而flatMap可以是一对多的转换,并且是转换为另外一个Flowable对象!
示例如下:

        ArrayList<String[]> list = new ArrayList<>();
        String[] words1 = {"Hello,", "I am", "China!"};
        String[] words2 = {"Hello,", "I am", "Beijing!"};
        list.add(words1);
        list.add(words2);
        Flowable.fromIterable(list)
                .flatMap(new Function<String[], Publisher<String>>() {
                    @Override
                    public Publisher<String> apply(@NonNull String[] strings) throws Exception {
                        return Flowable.fromArray(strings);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("Consumer->accept:"+s);

                    }
                });

运行结果如下所示:

Consumer->accept:Hello,
Consumer->accept:I am
Consumer->accept:China!
Consumer->accept:Hello,
Consumer->accept:I am
Consumer->accept:Beijing!

flatMap的转换可以分解为三个过程:

  1. 根据传入的事件生成一个Publisher对象(其实也可以理解为Flowable)。
  2. 激活该Flowable对象发送事件,而不是直接发送该Flowable对象。
  3. 同一个Flowable对象发送的事件都会汇总到Flowable后,Flowable负责将事件统一传递给subsrciber。

3. lift

我们可以将该方法视为map与flatMap的底层调用实现,其目的就是定义我们自己的Operator来完成变换。
lift方法接收一个FlowableOperator的参数,这个FlowableOperator就是定义我们自己的转换操作。
这样解释起来可能有些不太明了,下面我们举两个简单的例子来看下怎么使用lift实现map和flatMap的效果。

    Flowable.just(filePath)
                .lift(new FlowableOperator<Bitmap, String>() {
                    @Override
                    public Subscriber<? super String> apply(@NonNull final Subscriber<? super Bitmap> observer) throws Exception {
                        return new Subscriber<String>() {
                            @Override
                            public void onSubscribe(Subscription s) {
                                observer.onSubscribe(s);
                            }

                            @Override
                            public void onNext(String s) {
                                observer.onNext(getBitmapFromFile(new File(s)));

                            }

                            @Override
                            public void onError(Throwable t) {
                                observer.onError(t);
                            }

                            @Override
                            public void onComplete() {
                                observer.onComplete();

                            }
                        };
                    }
                })
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        showBitmap(bitmap);
                    }
                });
Flowable.fromIterable(list)
                .lift(new FlowableOperator<String, String[]>() {

                    @Override
                    public Subscriber<? super String[]> apply(@NonNull final Subscriber<? super String> observer) throws Exception {
                        return new Subscriber<String[]>() {
                            @Override
                            public void onSubscribe(Subscription s) {
                                observer.onSubscribe(s);
                            }

                            @Override
                            public void onNext(String[] strings) {
                                Flowable.fromArray(strings)
                                        .subscribe(new Consumer<String>() {
                                            @Override
                                            public void accept(String s) throws Exception {
                                                observer.onNext(s);
                                            }
                                        });
                            }

                            @Override
                            public void onError(Throwable t) {
                                observer.onError(t);

                            }

                            @Override
                            public void onComplete() {
                                observer.onComplete();
                            }
                        };
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println("accept ->"+s);
                    }
                });

4. range

该方法比较简单,用于产生int和long型数字。


    Flowable.range(1,5)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                })

输出1 2 3 4 5五个数字。

5. merge

主要用户合并对象,示例如下:

ArrayList<String> list1 = new ArrayList<>();
        list1.add("1");
        list1.add("2");
        list1.add("3");
        ArrayList<String> list2 = new ArrayList<>();
        list2.add("4");
        list2.add("5");
        list2.add("6");

        Flowable.merge(Flowable.fromIterable(list1), Flowable.fromIterable(list2))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });

输出1 2 3 4 5 6。

6. compose

调解转换的作用,示例如下:

    Flowable.merge(Flowable.fromIterable(list1), Flowable.fromIterable(list2))

                .compose(new FlowableTransformer<String, Integer>() {

                    @Override
                    public Publisher<Integer> apply(@NonNull Flowable<String> upstream) {

                        return upstream.map(new Function<String, Integer>() {
                            @Override
                            public Integer apply(@NonNull String s) throws Exception {
                                return Integer.parseInt(s);
                            }
                        });
                    }
                })
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer s) throws Exception {
                        System.out.println(s);
                    }
                });


输出1 2 3 4 5 6 六个数字。

7. compose与lift的区别

两者都实现了变换的功能,但是变换的内容和对象却不相同。

5 总结

至此,我们对RxJava的使用分析告一段落,作为一个牛逼的异步框架,如果能正确的引入到我们的项目中来一定能提高我们效率,降低后期我们的维护成本。
祝各位工作愉快。

上一篇下一篇

猜你喜欢

热点阅读