Rxjava2

rxjava

2017-12-11  本文已影响21人  Android_冯星

Rxjava到底是什么

一个词:异步
一个可以在java VM上使用可观测的序列来组成异步的、基本事件的程序库
一个实现异步操作的库

RXJava优缺点

简洁
随着程序的逻辑变得越来越复杂,它依然能够保持简洁。

API介绍和原理解析

1.概念:扩展的观察者模式

RXjava的异步实现,是通过一种扩展的观察者模式

观察者模式

观察者模式的面向需求是:对象A(观察者)对对象B(被观察者)的某种变化,高度敏感,需要在B变化的一瞬间做出反应。
观察者模式采用注册(register)或者成为订阅(subscrible)的方式,告诉被观察者,我需要你的某某状态,你要在它变化的时候告诉我。
Android开发中典型的例子就是view的点击监听器OnClickLinstener()。对设置onClickListener来说,view是被观察者,OnClickListener是观察者,二者通过setOnClickListener完成订阅关系。订阅完成之后,用户点击view的瞬间,Android Framework就会将点击事件交给已经注册的onClickListener采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。

OnClickListener 的模式大致如下图:

[图片上传失败...(image-2917dc-1512961567324)]

如图所示,通过setOnClickListener方法Button持有OnClickListener的引用,当用户点击Button,自动调用OnclickListener里的onClick方法。
把图片抽象出来 Button->被观察者 OnClickListener->观察者 setOnClickListener->订阅 onClick->事件。就由专用的观察者模式(例如只用于监听控件点击)转变成了通用的观察者模式。

[图片上传失败...(image-ed19bb-1512961567324)]

而 RxJava 作为一个工具库,使用的就是通用形式的观察者模式。

RXJava的观察者模式

RXjava有4个感念:

Observable和Observer通过Subscribe方法实现订阅,从而Observable可以在需要的时候发出事件通知Observer。

与传统的观察者模式不同,除了普通事件onNext() (相当于onClick/OnEvent),还定义两个特殊的事件onCompleted(),onError | completed 完成 完整的|

RxJava 的观察者模式大致如下图:

[图片上传失败...(image-f381dc-1512961567324)]

基本实现

基于以上的概念, RxJava 的基本实现主要有三点:

创建Observer 观察者

决定着事件触发将有怎样的行为

 Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

不仅基本使用方式一样,实质上,在RxJava的subscribe过程中,Observer也总是会先被转换成Subscriber在使用。所以使用基本功能,选择Observer或者subscriber都是一样的。他们的区别有两点。

  1. onStart() 这是subscriber新增方法,他会在subscribe刚开始,但是事件还没有发送之前被调用,可以用于做一些准备工作,例如数据清零或者重置,这是一个可选的方法。默认实现为空。但是如果对工作线程有要求的话(例如弹出一个对话框,需要在Ui线程执行),就不能使用onStart(),因为他总是调用在subscribe所发生的线程调用,而不能指定线程。如果指定线程来做准备工作,可以使用doOnSubscribe()方法。
  2. unSubscribe 这是Subscriber所实现的另一个接口Subscription()方法,用于取消订阅,在这个方法调用后Subscriber将不接受任何事件。一般在调用之前先使用isUnSubscribed先判断一下状态,unSubscribe()这个方法很重要,因为在subscribe之后,Observable会持有Subscriber的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以要保持良好的原则,要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

创建Observeable

Observable是被观察者,决定在什么时候被触发,和触发什么事件。
RXJava使用create()方法来创建一个Observable,并为他设置事件触发规则。

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

这里传入OnSubScribe对象作为参数,OnSubscribe会被存入染回的Observable对象中,相当于一个计划表,当OnSubscribeObservable被订阅时,OnSubscriable的call方法会被自动调用,事件序列会按照设定依次调用onNext方法和OnCompleted方法,这样,由被观察者调用了观察者的回调方法,就实现了被观察者向观察者的事件传递,即观察者模式

create方法是RXJava中最基本的创造事件序列的方法。RXJava还提供了一些方法用来快捷创建事件队列,例如:

Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等价的。

Subcsribe订阅

创建了Observable和Observe之后,用subscribe方法将他们链接起来,整条链子就可以工作了。

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

subscribe做了三件事

  1. 调用的Subscriber的onStart方法,可选的准备方法。
  2. 调用observable中的Call方法。在这里,事件发送的逻辑开始运行。在RXjava中Observable不是在创建时候就立即发送事件,而是在他订阅的时候,即放subscribe执行的时候。
  3. 将传入的subscribe作为Subscription返回,为了方便unSubscribe。

整个关系如下

[图片上传失败...(image-17e6e0-1512961567324

或者

[图片上传失败...(image-7845b8-1512961567324)]

除了subscribe(Observer) 或者 subscribe(subscriable),subscribe还支持不完整定义的回调,RXJava会自动创建出Subscriber

Action1<String> onNextAction = new Action1<String>() {
    // onNext()
    @Override
    public void call(String s) {
        Log.d(tag, s);
    }
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
    // onError()
    @Override
    public void call(Throwable throwable) {
        // Error handling
    }
};
Action0 onCompletedAction = new Action0() {
    // onCompleted()
    @Override
    public void call() {
        Log.d(tag, "completed");
    }
};

// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

Action0是RxJava中的一个接口,它只有一个方法Call,这个方法是一个无参无返回值的方法,由于onCompleted也是无参无返回值的,因此action可以当成一个包装对象,将onCompleted内容打包起来将自己作为一个参数传入subscribe中,以实现不完整定义的回调。也可以看做将onCompleted方法传递进了subscribe,相当于某些语言中的闭包。

Action1也是一个接口,他同样也只有一个方法Call(T param),这个方法无返回值,但是有一个参数,与Action0同理,由于onNext onError也是只有一个单参数,且没有返回值,因此Action1可以将OnNext(obj)和onError(error)打包起来传入subscribe中,以实现不完整定义的回调,事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。

场景事例

打印字符数组

将字符串数组 names 中的所有字符串依次打印出来:

 String[] names = {"冯星","曹操","赵云","马超"};
        rx.Observable.from(names).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d(TAG, "call: " +s);
            }
        });

由 id 取得图片并显示

由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中,并在出现异常的时候打印 Toast 报错:

Observable<Drawable> observable = Observable.create(new Observable.OnSubscribe<Drawable>() {
            @Override
            public void call(Subscriber<? super Drawable> subscriber) {
                subscriber.onNext(getResources().getDrawable(R.mipmap.water));
                subscriber.onCompleted();
            }
        });
        observable.subscribe(new Subscriber<Drawable>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                Toast.makeText(MainActivity.this,e.getMessage(),Toast.LENGTH_SHORT).show();
            }

            @Override
            public void onNext(Drawable drawable) {
                iv_demo.setImageDrawable(drawable);
            }
        });

正如上面两个例子这样,创建出 Observable 和 Subscriber ,再用 subscribe() 将它们串起来,一次 RxJava 的基本使用就完成了。非常简单。

[图片上传失败...(image-9074f5-1512961567324)]

在RXjava默认规则里,事件的发出和消费都在同一个线程里。也就是说上面是一个同步的观察者模式。

而观察者模式本身的目的在于 后台处理,前台调用 的异步机制。因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler 。

线程控制 -- Scheduler |si gan diu le| 调度

在不指定线程的情况下,RXjava遵循的是线程不变得原则,

即,在那个线程调用Subscribe,就在哪个线程生产事件;在哪个线程生产的事件,就在那个线程消费事件,

如果需要切换线程,就需要用到 Scheduler (调度器)。

Schedule的API

在RXjava中,schedule--调度器,相当于线程控制器,RXJava通过它来指定每一段代码应该运行哪一个线程。

subscribeOn() 指定subscribe()所发生的线程,即Observable.OnSubscribe()被激活时所发生的线程。或者叫做事件产生的线程。

ObserveOn()指定subscriber所发生的线程或者叫做事件消费的线程。

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

由于subscribeOn(Schedulers.io())的指定,被创建的事件1,2,3,4,将会在在Io线程发出。
由于observeOn(AndroidSchedulers.mainThread())的指定,因此subscriber的数字打印将发生在主线程。

事实上,这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

Schedule的原理

下面呢

变换

RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一

所谓变换,就是将事件序列中的对象或者整个序列进行加工处理,转换成不同的事件或者事件序列。

API

map()

Observable.just(R.mipmap.water)
                .map(new Func1<Integer, Bitmap>() {
                    @Override
                    public Bitmap call(Integer s) {
                        return BitmapFactory.decodeResource(getResources(),s);
                    }
                })
                .observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Bitmap>() {
                    @Override
                    public void call(Bitmap bitmap) {
                        iv_demo.setImageBitmap(bitmap);
                    }
                });

Func1和Action1非常相似。也是RXJava中的一个接口。用于包装有一个参数的方法。Func1和Action1的区别在于,Func1是由返回值得。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。

map()方法将参数中的string对象转换成BitMap对象后返回,经过map方法后,事件的参数也随之变成的bitmap

[图片上传失败...(image-2a65d2-1512961567324)]

Observable.from(students)
                .map(new Func1<Student, String>() {
                    @Override
                    public String call(Student student) {
                        return student.getName();
                    }
                })
                .observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.d(TAG, "call: "+s);
                    }
                });

很简单。那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:

 Observable.from(students)
                .subscribe(new Action1<Student>() {
                    @Override
                    public void call(Student student) {
                        List<Course> courses = student.getCourses();
                        for(Course course : courses){
                            Log.d(TAG, "onNext: "+ course.getName() + "    Student : " +student.getName());
                        }
                    }
                });

依然很简单。那么如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用很重要)?用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢? 这时候就需要flatMap了

Observable.from(students)
                .flatMap(new Func1<Student, Observable<Course>>() {
                    @Override
                    public Observable<Course> call(Student student) {
                        return Observable.from(student.getCourses());
                    }
                }).subscribe(new Action1<Course>() {
            @Override
            public void call(Course course) {
                Log.d(TAG, "call: " + course.getName());
            }
        });

flatMap和map有个相同点:也是把传入的参数转化之后返回另一个对象。但是需要注意的是flatMap返回的对象是Observable对象。并且这个Observable对象不是直接发送到了Subscriber的回调方法中。

flatMap的原理是这样的

  1. 使用传入的事件对象创建一个Observable对象。
  2. 并不发送这个Observable对象,而是将它激活,于是它开始发送事件。
  3. 每一个创建出来的Observable发送的事件,都汇入同一个Observable对象,而这个observable对象负责将这些事件传入Subscriber对象。

这三个步骤吧事件分成了两级,通过一组新创建的Observable将初始的对象铺平,之后通过统一路径分发下去,而这个『铺平』就是 flatMap() 所谓的 flat

flatMap()示意图

[图片上传失败...(image-a79136-1512961567324)]

扩展

由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava):

networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
    .flatMap(new Func1<String, Observable<Messages>>() {
        @Override
        public Observable<Messages> call(String token) {
            // 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
            return networkClient.messages();
        }
    })
    .subscribe(new Action1<Messages>() {
        @Override
        public void call(Messages messages) {
            // 处理显示消息列表
            showMessages(messages);
        }
    });

传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。

变换的原理 lift()

这些变化虽然功能不一样,但实质上都是针对事件的处理在发送。而在RXjava的内部,他们都是基于同一个基础的方法变化,lift(Operator)。

// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber); // 这个onSubscribe是原始的OnSubScribe对象!!
        }
    });
}

这段代码,它生成了一个新的Observable并且返回,新创建的Observable中的参数OnSubscribe的回调方法call()的实现,和Observable.Subscribe()基本一样,但是是由区别的。

不一样的地方在与OnSubscrvable中call(subscribe)所指代了对象不同

  1. 假设有一个Observable<T>调用了lift()并创建Observable后,一共有个Observable。
  2. 同样的Observable的参数OnSubscribe,加上之前原始的Observable里面的原始OnSubscribe,也就有了两个 OnSubscribe;
  3. 然后调用Observable.create传入Observable<R>,触发onSubscribe的Call方法,也是就override的方法,
  4. 在该方法中 调用了OnSubscribe.call()方法,注意:这个OnSubscribe方法是原始的Observable<T>的onSubscribe<T>对象。他需要传入一个Subscriber对象,这个对象是通过Subscriber newSubscriber = operator.call(subscriber);operator.call()方法生成的新的Subscribe。正是这个operator对象将两个Subscriber对象联系起来的。OnSubscribe<T>在执行Subscriber<R>.onNext(R r),而这里从T变成R,正好用到了传到Operator中的参数Func1<T, R>。

这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

也可以这个说:在Observable执行了lift(Operator)方法后,会返回一个新的Observable,这个新的Observable会象一个代理一样,负责接受原始的Observable发出的事件,并在处理后发送给Subscriber

[图片上传失败...(image-8fc12c-1512961567324)]

[图片上传失败...(image-925ff1-1512961567324)]

多次调用

[图片上传失败...(image-eb30f8-151296156732

举个例子

Observable.just(1.34f, 8.3453f, -534.34f, 392.99f)
        .map(new Func1<Float, Integer>() {
            @Override
            public Integer call(Float aFloat) {
                return Math.round(aFloat);
            }
        })
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return Integer.toBinaryString(integer);
            }
        })
        .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                log("2 map onNext->" + s);
            }
        });

// outputs
// 2 map onNext->1
// 2 map onNext->1000
// 2 map onNext->11111111111111111111110111101010
// 2 map onNext->110001001

该例子是一个Float->Integer->String的转换。我们按上面的流程来分析。

  1. 生成一个Observable<Float>
  2. 调用map生成Observable<Integer>
  3. 调用map生成Observable<String>
  4. subscribe()传入一个Subscribe(String),至此流的前半部分全部完成。
  5. 执行开始,Subscribe<String>发送事件,先生成一个Subscrver<Integer>传给Observable<Integer>(Observable<Integer>.onSubscribe.call())。
  6. Observable<Interger>开始发送事件,同样生成一个Subscriber<Float>传给Observable<Float>(Observable<Float>.onSubscribe.call())。
  7. 真正的发送事件开始,Observable<Float>调用Subscriber<Float>.onNext(Float)等方法,同时Subscriber<Integer>.onNext(Integer)被调用,同时Subscriber<String>.onNext(String)被调用,事件发送完成。

compose对Observable整体的变换 |com pou si| 构成 组成

除了lift方法外,Observable还有一个变换方法叫 compose(Transformer)它和lift的区别在于lift是针对事件项和事件序列,而compose是针对observable自身进行变换。

假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换。

public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
    @Override
    public Observable<String> call(Observable<Integer> observable) {
        return observable
            .lift1()
            .lift2()
            .lift3()
            .lift4();
    }
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);

像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。

Scheduler的API(二)

利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。

能不能可以多次切换线程
答案是能。因为ObserveOn指定的是

操作符分类

面我按类别把常用操作符分别介绍,其实很多内容都是来自于ReactiveX的官方网站,英文比较好的朋友可以参考(http://reactivex.io/)。
按照官方的分类,操作符大致分为以下几种:

Creating Observables(Observable的创建操作符),比如:Observable.create()、Observable.just()、Observable.from()等等;
Transforming Observables(Observable的转换操作符),比如:observable.map()、observable.flatMap()、observable.buffer()等等;
Filtering Observables(Observable的过滤操作符),比如:observable.filter()、observable.sample()、observable.take()等等;
Combining Observables(Observable的组合操作符),比如:observable.join()、observable.merge()、observable.combineLatest()等等;
Error Handling Operators(Observable的错误处理操作符),比如:observable.onErrorResumeNext()、observable.retry()等等;
Observable Utility Operators(Observable的功能性操作符),比如:observable.subscribeOn()、observable.observeOn()、observable.delay()等等;
Conditional and Boolean Operators(Observable的条件操作符),比如:observable.amb()、observable.contains()、observable.skipUntil()等等;
Mathematical and Aggregate Operators(Observable数学运算及聚合操作符),比如:observable.count()、observable.reduce()、observable.concat()等等;
其他如observable.toList()、observable.connect()、observable.publish()等等;

上一篇下一篇

猜你喜欢

热点阅读