RxJava(三、Basic realization)
目录
一、Is what 是什么
二、Concept 概念
三、Basic realization 基本实现
四、Scheduler 线程控制(上)
五、Scheduler 线程控制(下)
六、变换
因个人学习需要,故文章内容均为网上摘抄整理,感谢创作者的辛勤,源文章地址请看文末。
基本实现
主要分为三点:
1. 创建Observer(观察者)
2. 创建Observable(被观察者)
3. Subscribe(订阅)
一、创建Observer(观察者)
职责:决定了事件触发时将有怎样的行为
/**
* 1. observer接口实现方式
*/
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
/**
* 2. subscriber抽象类实现方式
*/
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
注意:
-
subscriber
是实现了observer
接口的抽象类, 扩展了部分方法。 - 在
RxJava
的subscribe
过程中,Observer
也总是先被转换成一个Subscriber
再使用。所以若使用基本功能,选择Observer
和Subscriber
是一样的。
Observer 和 Subscriber 的区别
- **onStart(): **
Subscriber
增加的方法。
在subscribe
刚开始,事件尚未发送之前被调用,可用于做一些准备工作,如数据的清零或重置。可选方法,默认实现为空。
注意:
onStart()
是在subscribe
所发生的线程被调用,不能指定线程(在指定线程做准备工作,用 doOnSubscribe()),故不应做对线程有要求的操作(如 涉及主线程操作)。
- **unsubscribe(): **是
Subscriber
实现的Subscription
接口的方法,用于取消订阅。调用该方法后,Subscriber
将不再接收事件。可用isUnsubscribed()
判断状态。
注意:在
subscribe()
之后,Observable
会持有Subscriber
的引用,该引用如果不能及时释放,会有内存泄露的风险。所以,不再使用时及时取消订阅(可在onPause()
、onStop()
中)。
二、创建Observable(被观察者)
职责:决定什么时候触发事件及触发怎样的事件。
涉及方法:
- create()、
- just(T...)、
- from(T[]) / from(Iterable<? extends T>)
/**
* 使用 create() 方法创建Observable,并定义事件触发规则
*/
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
其中,传入一个 OnSubscribe
对象作为参数。OnSubscribe
会被存储在返回的 Observable
对象中,作用相当于一个计划表,当 Observable
被订阅的时候,OnSubscribe
的 call()
方法会自动被调用,事件序列就会依照设定依次触发(即:观察者Subscriber
调用三次 onNext()
、一次 onCompleted()
)。
由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
/**
* just(T...): 将传入的参数依次发送出来。
*/
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
/**
* from(T[]) / from(Iterable<? extends T>):
* 将传入的数组或 Iterable拆分成具体对象后,依次发送出来。
*/
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 将会依次调用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
上面 just(T...)
和 from(T[])
的例子,都和 create(OnSubscribe)
的例子是等价的。
三、Subscribe(订阅)
创建了 Observable
和 Observer
后,再用 subscribe()
方法将它们联结起来,就可以工作了。如下:
observable.subscribe(observer);
// 或
observable.subscribe(subscriber);
Observable.subscribe(Subscriber)
的内部实现(仅核心代码):
// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 看源码到 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;}
其中,subscriber()
做了3件事:
- 调用
Subscriber.onStart()
。该方法在前面介绍过,是可选的准备方法。 - 调用
Observable
中的OnSubscribe.call(Subscriber)
。这里,事件发送的逻辑开始运行。
注意:
Observable
并不是在创建的时就立即开始发送事件,而是在它被订阅的时候,即当subscribe()
方法执行的时候。
- 将传入的
Subscriber
作为Subscription
返回。为了方便unsubscribe()
。
除了 subscribe(Observer)
和 subscribe(Subscriber)
,subscribe()
还支持不完整定义的回调,RxJava
会自动根据定义创建出 Subscriber
。如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自动创建 Subscriber,使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber,使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber,使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
解释 Action1 和 Action0。
Action0:
是RxJava
的一个接口,只有一个方法 call()
,该方法无参无返回值;由于 onCompleted()
方法也是无参无返回值,因此 Action0
可当成一个包装对象,将 onCompleted()
的内容打包起来将自己作为一个参数传入 subscribe()
以实现不完整定义的回调。这样其实也可以看做将 onCompleted()
方法作为参数传进了 subscribe()
,相当于其他某些语言中的『闭包』。
Action1:
是 RxJava
的一个接口,只有一个方法 call(T param)
,该方法有一个参数,无返回值;与 Action0
同理,由于 onNext(T obj)
和 onError(Throwable error)
也是单参数无返回值,因此 Action1
可以将 onNext(obj)
和 onError(error)
打包起来传入 subscribe()
以实现不完整定义的回调。
注意:虽然
Action0
和Action1
在API
中使用最广泛,但RxJava
提供了多个ActionX
形式的接口 (如Action2、Action3
) ,它们可以被用以包装不同的无返回值的方法。
举两个栗子
a. 打印字符串数组
/**
* 将字符串数组 names中的所有字符串依次打印出来
*/
String[] names = ...;
Observable.from(names).subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
b. 由 id 取得图片并显示
/**
* 由指定的一个 drawable 文件 id drawableRes 取得图片,
* 并显示在 ImageView 中,在出现异常的时候打印 Toast 报错
*/
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
注意:上面的两个例子均是同步的观察者模式。