Rxjava2- 一、基本使用

2019-12-11  本文已影响0人  吐必南波丸
一、配置

要在Android中使用RxJava2, 在app的build.gradle中添加依赖:

implementation 'io.reactivex.rxjava2:rxjava:2.2.1'

implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
二、框架结构

Rxjava整体是一条链其中:
1.链的上游是生产者:Observable
2.链的下游是观察者:Observer
3.链的中间:各个中介节点,既是下游的Observable,又是上游的Observer

三、基本使用

第一步:创建被观察者 Observable

      Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onComplete();// 发送完onComplete后,下游将不会再接收消息
            }
        });

第二步:创建观察者Observer

   Observer observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            //  开始订阅调用的方法
            }

            @Override
            public void onNext(Integer integer) {
               
            }

            @Override
            public void onError(Throwable e) {
                Toast.makeText(TestAcitivity.this, "Error!", Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onComplete() {
           
            }
        };

第三步:建立订阅关系,被观察者需要去订阅观察者

   observable
                .subscribeOn(Schedulers.io()) //  在线程中执行上游操作
                .observeOn(AndroidSchedulers.mainThread())// 在主线程中执行下游操作
                .subscribe(observer); 

<meta charset="utf-8">

3.2 线程的类型

subscribeOn/observeOn都要求传入一个Schedulers的子类,它就代表了运行线程类型,下面我们来看一下都有哪些选择:

以上是在io.reactivex.schedulers包中,提供的Schedulers,而如果我们导入了下面的依赖,那么在io.reactivex.android.schedulers下,还有额外的两个Schedulers可选:

四、关键词和方法解释

一.Emitter
就是发射器的,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
这里需要注意的是:
1、上游可以发送无限个onNext, 下游也可以接收无限个onNext。当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件。
2、当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件。上游可以不发送onComplete或onError。
3、发送多个onComplete是可以正常运行的, 但是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃。

二.subscribeOn()
1.在Scheduler制定的线程启动subscribe()
2.切换起源的Observable的线程
3.当多次调用subscribeOn() 的时候,只有最上面的会对起源Observable起作用
三:ObserveOn
1.在内部创建Observer的onNext() onError()
onSuccess()等回调方法里,通过Scheduler指定的线程来调用下级Observer的对应回调方法
2.切换observeOn()下面的Oberver的回调所在的线程
3.当多次调用observerOn()的时候,每个都会进行一次线程的切换,影响范围是它下面的每个Oberver(除非又遇到新的observeOn())

四.Scheduler的原理
1.Schedulers.newThread() ޾ Schedulers.io();
· 当scheduleDirect()被调用时候,会创建一个 Worker,Worker的内部会有一个Excutor,由Executor来完成实际的线程切换;
· scheduleDirect() 还会创建出一个Disposable对象,交给外层的Observer,让它执行dispose()操作,取消订阅链;
· newThread()和io()的区别在于,io()可能会对Excutor进行重用.
2.AndroidSechedulers.mainThread();
通过内部的Handler把任务post到主线程去做

上一篇 下一篇

猜你喜欢

热点阅读