RxJava 知识梳理(2) - RxJava2 操作符实战
一、概述
本文中的所有示例均来自RxJava2
这个大神的仓库,这两篇文章主要是对示例当中的用法进行一个记录,方便以后进行查询。
二、详细示例
2.1 SimpleExampleActivity
这是一个最简单的例子,被订阅者发射了两个数据(Cricket
、Football
),并且有一个订阅者,发生订阅时,会先回调onSubscribe(Disposable d)
,发射数据后,回调onNext(String value)
方法,最后回调onComplete()
。
private void doSomeWork() {
getObservable()
//被订阅者在子线程进行操作.
.subscribeOn(Schedulers.io())
//订阅者在主线程处理结果.
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
private Observable<String> getObservable() {
return Observable.just("Cricket", "Football");
}
private Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
SimpleExampleActivity.png
2.2 MapExampleActivity
map
操作符可以将一个类型的被订阅者的数据传给接收另一个类型的订阅者,即Obervable<T>
发射的数据可以传给Obersever<U>
,在转换的中间,我们需要自己定义转换的规则,即map(new Function<T, U>())
。
private void doSomeWork() {
getObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<List<ApiUser>, List<User>>() {
@Override
public List<User> apply(List<ApiUser> apiUsers) throws Exception {
return Utils.convertApiUserListToUserList(apiUsers);
}
})
.subscribe(getObserver());
}
private Observable<List<ApiUser>> getObservable() {
return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() {
@Override
public void subscribe(ObservableEmitter<List<ApiUser>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getApiUserList());
e.onComplete();
}
}
});
}
private Observer<List<User>> getObserver() {
return new Observer<List<User>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(List<User> userList) {
textView.append(" onNext");
textView.append(AppConstant.LINE_SEPARATOR);
for (User user : userList) {
textView.append(" firstName : " + user.firstName);
textView.append(AppConstant.LINE_SEPARATOR);
}
Log.d(TAG, " onNext : " + userList.size());
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
MapExampleActivity.png
2.3 ZipExampleActivity
zip
是Observable
的静态方法,我们通过它传入n
个可以是不同类型的订阅者,以及一个声明将这n
个被订阅者组合成一个新的被订阅者的函数,这个函数的返回值要和订阅者的类型的Observer<T>
的T
一致。
private void doSomeWork() {
Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
new BiFunction<List<User>, List<User>, List<User>>() {
@Override
public List<User> apply(List<User> cricketFans, List<User> footballFans) throws Exception {
return Utils.filterUserWhoLovesBoth(cricketFans, footballFans);
}
})
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
private Observable<List<User>> getCricketFansObservable() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesCricket());
e.onComplete();
}
}
});
}
private Observable<List<User>> getFootballFansObservable() {
return Observable.create(new ObservableOnSubscribe<List<User>>() {
@Override
public void subscribe(ObservableEmitter<List<User>> e) throws Exception {
if (!e.isDisposed()) {
e.onNext(Utils.getUserListWhoLovesFootball());
e.onComplete();
}
}
});
}
private Observer<List<User>> getObserver() {
return new Observer<List<User>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(List<User> userList) {
textView.append(" onNext");
textView.append(AppConstant.LINE_SEPARATOR);
for (User user : userList) {
textView.append(" firstName : " + user.firstName);
textView.append(AppConstant.LINE_SEPARATOR);
}
Log.d(TAG, " onNext : " + userList.size());
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
ZipExampleActivity.png
2.4 DisposableExampleActivity
private final CompositeDisposable disposables = new CompositeDisposable();
void doSomeWork() {
disposables.add(sampleObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<String>() {
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
}));
}
static Observable<String> sampleObservable() {
return Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
SystemClock.sleep(2000);
return Observable.just("one", "two", "three", "four", "five");
}
});
}
DisposableExampleActivity.png
2.5 TakeExampleActivity
take(n)
操作,仅发射前n
个数据。
private void doSomeWork() {
getObservable()
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.take(3)
.subscribe(getObserver());
}
private Observable<Integer> getObservable() {
return Observable.just(1, 2, 3, 4, 5);
}
private Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
TakeExampleActivity.png
2.6 TimerExampleActivity
Observable
的静态方法,它会创建Observable
,并在等待指定的时间之后发射唯一的事件。
private void doSomeWork() {
getObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
private Observable<? extends Long> getObservable() {
return Observable.timer(2, TimeUnit.SECONDS);
}
private Observer<Long> getObserver() {
return new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Long value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果,可以发现即使我们没有发射任何数据,也会有一个默认值回调到onNext
方法当中:
2.7 IntervalExampleActivity
interval
和timer
类似,也是Observable
的静态创建方法,它和timer
的区别是:它并不是在计时结束后就发射唯一的事件,而是每隔一段时间,就发射一次事件,并且Observer
会收到递增的值。
private void doSomeWork() {
disposables.add(getObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(getObserver()));
}
private Observable<? extends Long> getObservable() {
return Observable.interval(0, 2, TimeUnit.SECONDS);
}
private DisposableObserver<Long> getObserver() {
return new DisposableObserver<Long>() {
@Override
public void onNext(Long value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
IntervalExampleActivity.png
2.8 SingleObserverExampleActivity
SingleObserver
属于Observer
的一种,它和普通Observer
的不同,在于其回调方法,它没有onNext(T t)
方法,而是只有onSuccess(T t)
方法。
private void doSomeWork() {
Single.just("Amit").subscribe(getSingleObserver());
}
private SingleObserver<String> getSingleObserver() {
return new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onSuccess(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};
}
运行结果:
SingleObserverExampleActivity.png
2.9 CompletableObserverExampleActivity
CompletableObserver
也是属于Observer
的一种,它只有onComplete
回调,而没有类似与onNext
之间的参数回调。
private void doSomeWork() {
Completable completable = Completable.timer(1000, TimeUnit.MILLISECONDS);
completable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getCompletableObserver());
}
private CompletableObserver getCompletableObserver() {
return new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};
}
运行结果:
CompletableObserverExampleActivity.png
2.10 FlowableExampleActivity
private void doSomeWork() {
Flowable<Integer> observable = Flowable.just(1, 2, 3, 4);
observable.reduce(50, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
}).subscribe(getObserver());
}
private SingleObserver<Integer> getObserver() {
return new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onSuccess(Integer value) {
textView.append(" onSuccess : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onSuccess : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};
}
运行结果:
FlowableExampleActivity.png
2.11 ReduceExampleActivity
reduce
是把发射序列内值进行两两比较,直到比较出最值,如果序列的长度小于2,那么不会被回调。
private void doSomeWork() {
getObservable().reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
}).subscribe(getObserver());
}
private Observable<Integer> getObservable() {
return Observable.just(1, 2, 3, 4);
}
private MaybeObserver<Integer> getObserver() {
return new MaybeObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onSuccess(Integer value) {
textView.append(" onSuccess : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onSuccess : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
2.12 BufferExampleActivity
buffer(a, b)
,a
表示数组的最大长度,b
表示步长。
private void doSomeWork() {
Observable<List<String>> buffered = getObservable().buffer(3, 1);
buffered.subscribe(getObserver());
}
private Observable<String> getObservable() {
return Observable.just("one", "two", "three", "four", "five");
}
private Observer<List<String>> getObserver() {
return new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(List<String> stringList) {
textView.append(" onNext size : " + stringList.size());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : size :" + stringList.size());
for (String value : stringList) {
textView.append(" value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " : value :" + value);
}
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
BufferExampleActivity.png
2.13 FilterExampleActivity
filter
操作符需要传入一个条件函数,仅当发射的数据满足该条件时,订阅者才会收到数据。
private void doSomeWork() {
Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer % 2 == 0;
}
})
.subscribe(getObserver());
}
private Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" onNext : ");
textView.append(AppConstant.LINE_SEPARATOR);
textView.append(" value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext ");
Log.d(TAG, " value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
FilterExampleActivity.png
2.14 SkipExampleActivity
skip(n)
操作符,会跳过前n
个结果。
private void doSomeWork() {
getObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.skip(2)
.subscribe(getObserver());
}
private Observable<Integer> getObservable() {
return Observable.just(1, 2, 3, 4, 5);
}
private Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
SkipExampleActivity.png
2.15 ScanExampleActivity
scan
操作符通过遍历被订阅者产生的结果,依次对每一个结果项按照指定规则进行运算,计算后的结果作为下一个迭代项参数,每一次迭代项都会把计算结果输出给订阅者。
private void doSomeWork() {
getObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer int1, Integer int2) throws Exception {
return int1 + int2;
}
})
.subscribe(getObserver());
}
private Observable<Integer> getObservable() {
return Observable.just(1, 2, 3, 4, 5);
}
private Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
ScanExampleActivity.png
2.16 ReplayExampleActivity
relay(n)
,使得即使在未订阅时,被订阅者已经发射了数据,订阅者也可以收到被订阅者在订阅之前最多n
个数据。
private void doSomeWork() {
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(3);
connectableObservable.connect(); // connecting the connectableObservable
connectableObservable.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
connectableObservable.subscribe(getSecondObserver());
}
private Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " First onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" First onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" First onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" First onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onComplete");
}
};
}
private Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
textView.append(" Second onSubscribe : isDisposed :" + d.isDisposed());
Log.d(TAG, " Second onSubscribe : " + d.isDisposed());
textView.append(AppConstant.LINE_SEPARATOR);
}
@Override
public void onNext(Integer value) {
textView.append(" Second onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" Second onError : " + e.getMessage());
Log.d(TAG, " Second onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" Second onComplete");
Log.d(TAG, " Second onComplete");
}
};
}
运行结果:
ReplayExampleActivity.png
2.17 ConcatExampleActivity
Observable.concat(Observable a, Observable b)
,连接两个被订阅者,订阅者将会按照a->b
的顺序收到两个被订阅者所发射的消息。
private void doSomeWork() {
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable<String> aObservable = Observable.fromArray(aStrings);
final Observable<String> bObservable = Observable.fromArray(bStrings);
Observable.concat(aObservable, bObservable)
.subscribe(getObserver());
}
private Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
ConcatExampleActivity.png
2.18 MergeExampleActivity
merge
和concat
类似,也是用来连接两个被订阅者,但是它不保证两个被订阅发射数据的顺序。
private void doSomeWork() {
final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable<String> aObservable = Observable.fromArray(aStrings);
final Observable<String> bObservable = Observable.fromArray(bStrings);
Observable.merge(aObservable, bObservable)
.subscribe(getObserver());
}
private Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
MergeExampleActivity.png
2.19 DeferExampleActivity
使用defer
可以延迟被订阅者的生成,也就是使得被订阅者是在订阅发生时才生成。
private void doSomeWork() {
Car car = new Car();
Observable<String> brandDeferObservable = car.brandDeferObservable();
car.setBrand("BMW");
brandDeferObservable.subscribe(getObserver());
}
private Observer<String> getObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
DeferExampleActivity.png
2.20 DistinctExampleActivity
distinct
会过滤掉源Observable
已经发射过的数据,只有判断该数据没有发射过,才会递交给下游的Observer
。
private void doSomeWork() {
getObservable()
.distinct()
.subscribe(getObserver());
}
private Observable<Integer> getObservable() {
return Observable.just(1, 2, 1, 1, 2, 3, 4, 6, 4);
}
private Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, " onComplete");
}
};
}
运行结果:
DistinctExampleActivity.png
2.21 LastOperatorExampleActivity
private void doSomeWork() {
getObservable().last("A1") .subscribe(getObserver());
}
private Observable<String> getObservable() {
return Observable.just("A1", "A2", "A3", "A4", "A5", "A6");
}
private SingleObserver<String> getObserver() {
return new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onSuccess(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
};
}
运行结果:
LastOperatorExampleActivity.png
2.22 ReplaySubjectExampleActivity
ReplaySubject
使得无论订阅者在何时订阅,它们都能收到被订阅者发射序列中的所有数据。
private void doSomeWork() {
ReplaySubject<Integer> source = ReplaySubject.create();
source.subscribe(getFirstObserver());
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
source.subscribe(getSecondObserver());
}
private Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " First onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" First onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" First onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" First onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onComplete");
}
};
}
private Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
textView.append(" Second onSubscribe : isDisposed :" + d.isDisposed());
Log.d(TAG, " Second onSubscribe : " + d.isDisposed());
textView.append(AppConstant.LINE_SEPARATOR);
}
@Override
public void onNext(Integer value) {
textView.append(" Second onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" Second onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" Second onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onComplete");
}
};
}
运行结果:
ReplaySubjectExampleActivity.png
2.23 PublishSubjectExampleActivity
如果使用了PublishSubject
,那么对于订阅者来说,它们只会收到被订阅者在订阅之后发射的序列。
private void doSomeWork() {
PublishSubject<Integer> source = PublishSubject.create();
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.subscribe(getSecondObserver());
source.onNext(4);
source.onComplete();
}
private Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " First onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" First onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" First onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" First onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onComplete");
}
};
}
private Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
textView.append(" Second onSubscribe : isDisposed :" + d.isDisposed());
Log.d(TAG, " Second onSubscribe : " + d.isDisposed());
textView.append(AppConstant.LINE_SEPARATOR);
}
@Override
public void onNext(Integer value) {
textView.append(" Second onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" Second onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" Second onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onComplete");
}
};
}
运行结果:
PublishSubjectExampleActivity.png
2.24 BehaviorSubjectExampleActivity
如果使用了BehaviorSubject
这个被订阅者,那么被订阅者收到的序列和它订阅的时间点有关,被订阅者会收到发生订阅之前最近一次的发射的值(如果之前没有发射任何值,那么有可能是一个种子/默认值),以及从订阅时刻到整个序列完成的序列。
private void doSomeWork() {
BehaviorSubject<Integer> source = BehaviorSubject.create();
source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4 and onComplete
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.subscribe(getSecondObserver());
source.onNext(4);
source.onComplete();
}
private Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " First onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" First onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" First onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" First onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onComplete");
}
};
}
private Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
textView.append(" Second onSubscribe : isDisposed :" + d.isDisposed());
Log.d(TAG, " Second onSubscribe : " + d.isDisposed());
textView.append(AppConstant.LINE_SEPARATOR);
}
@Override
public void onNext(Integer value) {
textView.append(" Second onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" Second onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" Second onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onComplete");
}
};
}
运行结果:
BehaviorSubjectExampleActivity.png
2.25 AsyncSubjectExampleActivity
如果使用了AsyncSubject
这个被订阅者,那么它所有的订阅者只会在被订阅者整个序列发射完成之后才能收到值,并且仅仅会收到整个发射序列的最后一个值。如果订阅者没有发射任何一个值,那么所有的被订阅者都不会收到除完成之外的其它消息。
private void doSomeWork() {
AsyncSubject<Integer> source = AsyncSubject.create();
source.subscribe(getFirstObserver()); //第一个订阅者.
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.subscribe(getSecondObserver()); //第二个订阅者.
source.onNext(4);
source.onComplete();
}
private Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " First onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" First onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" First onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" First onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " First onComplete");
}
};
}
private Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
textView.append(" Second onSubscribe : isDisposed :" + d.isDisposed());
Log.d(TAG, " Second onSubscribe : " + d.isDisposed());
textView.append(AppConstant.LINE_SEPARATOR);
}
@Override
public void onNext(Integer value) {
textView.append(" Second onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onNext value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" Second onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" Second onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " Second onComplete");
}
};
}
运行结果:
AsyncSubjectExampleActivity.png
2.26 ThrottleFirstExampleActivity
throttleFirst
用来解决抖动的问题,我们可以设置一段时间,之后它会发射固定时间长度之内的第一个事件,而屏蔽其它的事件。
private void doSomeWork() {
getObservable()
.throttleFirst(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
Thread.sleep(0);
emitter.onNext(1); // deliver
emitter.onNext(2); // skip
Thread.sleep(505);
emitter.onNext(3); // deliver
Thread.sleep(99);
emitter.onNext(4); // skip
Thread.sleep(100);
emitter.onNext(5); // skip
emitter.onNext(6); // skip
Thread.sleep(305);
emitter.onNext(7); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
private Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" onNext : ");
textView.append(AppConstant.LINE_SEPARATOR);
textView.append(" value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext ");
Log.d(TAG, " value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
ThrottleFirstExampleActivity.png
2.27 ThrottleLastExampleActivity
throttleLast
和throttleFirst
类似,都是设置固定的时间长度,但是它发射的是这个时间段内的最后一个事件,而不是第一个事件。
private void doSomeWork() {
getObservable()
.throttleLast(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
Thread.sleep(0);
emitter.onNext(1); // skip
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(99);
emitter.onNext(4); // skip
Thread.sleep(100);
emitter.onNext(5); // skip
emitter.onNext(6); // deliver
Thread.sleep(305);
emitter.onNext(7); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
private Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" onNext : ");
textView.append(AppConstant.LINE_SEPARATOR);
textView.append(" value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext ");
Log.d(TAG, " value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
运行结果:
ThrottleLastExampleActivity.png
2.28 DebounceExampleActivity
Observable.debounce(float value, Unit unit)
,dounce
,被订阅者在收到要发射消息的指令后,会等待一段时间,如果在这段时间内没有新的消息发射指令,那么它会发射这条消息,否则它会丢弃掉它,从这个新收到的值开始重新等待设置的时间长度。
private void doSomeWork() {
getObservable()
.debounce(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
}
private Observable<Integer> getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
});
}
private Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(Integer value) {
textView.append(" onNext : ");
textView.append(AppConstant.LINE_SEPARATOR);
textView.append(" value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext ");
Log.d(TAG, " value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
}
2.29 WindowExampleActivity
每隔ns
集中发射这段时间内的数据,而不是一有数据就发射。
protected void doSomeWork() {
Observable.interval(1, TimeUnit.SECONDS).take(12)
.window(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getConsumer());
}
public Consumer<Observable<Long>> getConsumer() {
return new Consumer<Observable<Long>>() {
@Override
public void accept(Observable<Long> observable) throws Exception {
Log.d(TAG, "Sub Divide begin....");
textView.append("Sub Divide begin ....");
textView.append(AppConstant.LINE_SEPARATOR);
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long value) {
Log.d(TAG, "Next:" + value);
textView.append("Next:" + value);
textView.append(AppConstant.LINE_SEPARATOR);
}
});
}
};
}
更多文章,欢迎访问我的 Android 知识梳理系列:
- Android 知识梳理目录:http://www.jianshu.com/p/fd82d18994ce
- 个人主页:http://lizejun.cn
- 个人知识总结目录:http://lizejun.cn/categories/