Android Developer

Android Develop——RxJava2(一)

2017-09-11  本文已影响73人  So_ProbuING

重要的话写在前面

RxJava2的原理

Season大神讲解的并不是各种技术的术语与概念而是从本质的模式开始阐述,从事件流角度来说明RxJava的基本工作原理
有两根水管,一根为事件产生的水管,上游。一根为事件接收的水管,下游。两根水管通过一定的方式连接起来,使得上游每产生一个事件,下游就能接收到该事件。
使用RxjAVA 需要在gradle中配置

    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
事件流向

这样的模式在RxJava中,上游就是被观察者Observable(可观察者),下游就是观察者(Observer),对应到代码层面来说就是:

  public void startRxjava2Lesson() {
        // TODO: 创建一个被观察者 上游对象
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
            }
        });
        // TODO: 创建观察被观察者的对象——观察者 下游
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: ");
            }


            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        };
        //建立连接
        observable.subscribe(observer);
    }

只有当上游和下游建立连接之后,上游才会开始发送事件,也就是Observable调用了subscribe()方法之后才开始发送事件

 public void startRxJava2Lesson() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: ");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
    }

Observable Observable的创建方式

Observable

Observable采用工厂创建模式(?)

 Observable<T> create(@NotNull io.reactivex.ObservableOnSubscribe<T> source)

Defer:直到有订阅者订阅了才创建Observable 对象,才通过Observable工厂创建Observable并执行,这样确保Observable 每次状态为最新**

<T> Observable<T> defer(Func0<Observable<T>> observableFactory)

Empty:返回Obervable 不执行任何其他操作,直接执行订阅者的onComplete()方法

<T>Observable<T> empty()

Never:产生一个不会执行任何参数永远不会结束的Observable,起初只用于测试。
Throw:创建一个不发射数据并且以错误结束的Observable。
From:将数组/列表用来创建Observable对象,将里面的对象一一当参数执行,或者可以用Future来创建Observable对象,将Future.get()的值作为参数执行,我们可以指定一个超时的值。Observable将等待来自Future的结果;如果在超时之前仍然没有结果返回,Observable将会触发onError()方法通知观察者有错误发生了。

<T> Observable<T> from(Future<? super T> future)
<T> Observable<T> from(Future<? super T> future,long timeout, TimeUnit unit)
<T> Observable<T> from(T[] array)

Interval:每隔一段时间就产生一个数字,这些数字从0开始,一次递增1直至无穷大

Observable<long> interval(long initialDelay,long period, TimeUnit unit)

Just:把其他类型的对象和数据类型转化成Observable,just将单个数据转换为发射那个数据的Observable,Just类似与From,但是From会将数组或Iterable的元素取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当作单个数据,如果传递一个null给Just,它会返回一个发射null值的Observable

<T>Observable<T> just(T t1,T t2,T t3)

Range:创建一组在从n开始,个数为m的连续数字的Observable,比如range(3,10),就是创建3、4、5…12的一组数字。

Observable<Integer> range(int start,int count)

Repeat:对某一个Observable,重复产生多次结果

Observable<T> repeat(final long count, Scheduler scheduler)

Timer:创建一连串数字,间隔固定时间

Observable<Long> timer(long initialDelay,long period, TimeUnit unit, Scheduler scheduler)

Observer

Observer采用构造方法创建

Observer<Integer> observer = new Observer<Integer>

ObservableEmitter

ObservableEmitter:Emitter是发射器的意思,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable t) 就可以发出对应的事件

Disposable

Disposable的意思是一次性用品,用完即可丢弃的,在RxJava中对应于上面的水管的粒子,可以理解为两根管道之间的一个机关,当调用它的dispose()方法时,就会将两根管道切断。切断后上游发送的事件下游将不会收到事件。
如果在开发中有多个Disposable我们该如何管理呢?在RxJava中已经内置了一个容器CompositeDisposable,每当我们得到一个Disposable时就调用CompositeDisposable.add()将该Disposable添加到容器中,在需要切断事件时调用CompositeDisposable.clear()即可切断所有的水管。

subscribe()的重载方法

    public final Disposable subscribe() {}
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    public final void subscribe(Observer<? super T> observer) {}

Rxjava2线程调度

在RxJava2中,当我们在主线程中去创建一个上游Observable来发送事件,则这个上游默认在主线程中发送事件。
当我们在主线程中创建一个下游Observer来接收事件,则这个下游默认就在主线程中接收事件

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
                //emit 1
                e.onNext(1);

            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + Thread.currentThread().getName());
                Log.d(TAG, "accept: " + integer);
            }
        });
09-11 01:27:13.550 19622-19622/com.wx.rxjavastu D/Rxjava2Lesson: subscribe: main
09-11 01:27:13.550 19622-19622/com.wx.rxjavastu D/Rxjava2Lesson: accept: main
09-11 01:27:13.550 19622-19622/com.wx.rxjavastu D/Rxjava2Lesson: accept: 1

如果我们需要改变执行的线程,例如我们希望让上游的Observable在子线程中发送事件,然后希望让下游的Cusmer在主线程接收事件,我们可以通过RxJava内置的线程调度器来调整。

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
                //emit 1
                e.onNext(1);

            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + Thread.currentThread().getName());
                        Log.d(TAG, "accept: " + integer);
                    }
                });
09-11 01:30:37.675 22693-22722/com.wx.rxjavastu D/Rxjava2Lesson: subscribe: RxCachedThreadScheduler-1
09-11 01:30:37.704 22693-22693/com.wx.rxjavastu D/Rxjava2Lesson: accept: main
09-11 01:30:37.704 22693-22693/com.wx.rxjavastu D/Rxjava2Lesson: accept: 1

我们改变事件的发送接收线程,需要使用

.subscribeOn(Schedulers.newThread())                                              
.observeOn(AndroidSchedulers.mainThread())
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
                //emit 1
                e.onNext(1);

            }
        }).subscribeOn(Schedulers.io())
                // change thread firest
                .observeOn(AndroidSchedulers.mainThread())
                //2rd
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + Thread.currentThread().getName());
                        Log.d(TAG, "accept: " + integer);
                    }
                });
09-11 01:34:49.456 26491-26514/com.wx.rxjavastu D/Rxjava2Lesson: subscribe: RxCachedThreadScheduler-1
09-11 01:34:49.503 26491-26522/com.wx.rxjavastu D/Rxjava2Lesson: accept: RxCachedThreadScheduler-2
09-11 01:34:49.503 26491-26522/com.wx.rxjavastu D/Rxjava2Lesson: accept: 1

RxJava中,内置的线程选项

上一篇 下一篇

猜你喜欢

热点阅读