

2019-05-10  本文已影响2人  Kael_祈求者

(1)RxJava2 的5大基类及本章学习结构图


(1)Flowable --> 0..N flows, supporting Reactive-Streams and backpressure
(2)Observable --> 0..N flows, no backpressure,
(3)Single --> a flow of exactly 1 item or an error,
(4)Completable --> a flow without items but only a completion or error signal.
(5)Maybe --> a flow with no items, exactly one item or an error.

Type Class Interface Consumer
0..N backpressured Flowable Publisher1 Subscriber
0..N unbounded Observable ObservableSource2 Observer
1 element or error Single SingleSource SingleObserver
0..1 element or error Maybe MaybeSource MaybeObserver
0 element or error Completable CompletableSource CompletableObserver



Flowable.create(new FlowableOnSubscribe<Integer>(), BackpressureStrategy.BUFFER)
BUFFER : 缓存池队列,下游进行消费数据,OOM
MISSING:不丢也不缓存,需要下游进行处理  Sample,ThrottleFirst, buffer(1,Seconds)...


解决背压问题是需要增加额外的处理逻辑,因此,Flowable的运行效率要比Observable 差一些


    public void ClickOne(View view) {
        Observable.interval(1, TimeUnit.MICROSECONDS)
                .subscribe(new Observer<Long>() {
                    public void onSubscribe(Disposable d) {

                    public void onNext(Long aLong) {
                        try {
                        } catch (InterruptedException e) {
                        Log.w("TAG", "---->" + aLong);

                    public void onError(Throwable e) {

                    public void onComplete() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 10000; i++) {
        }, BackpressureStrategy.DROP)  //指定背压策略
                .subscribe(new Consumer<Integer>() {
                    public void accept(Integer integer) throws Exception {
                        Log.d("TAG", integer.toString());
                }, new Consumer<Throwable>() {
                    public void accept(Throwable throwable) throws Exception {
                        Log.d("TAG", throwable.toString());


        Flowable.range(1, 10)
                .subscribe(new Subscriber<Integer>() {
                    public void onSubscribe(Subscription s) {
                        Log.w("TAG", "onSubscribe start");
                        Log.w("TAG", "onSubscribe end");

                    public void onNext(Integer aLong) {
                        Log.w("TAG", "---->" + aLong);

                    public void onError(Throwable t) {
                        Log.w("TAG", "onError");

                    public void onComplete() {
                        Log.w("TAG", "onComplete");


    //Single操作 单一操作
    public void ClickThree(View view) {
        Single.create(new SingleOnSubscribe<String>() {
            public void subscribe(SingleEmitter<String> emitter) throws Exception {
        }).subscribe(new SingleObserver<String>() {
            public void onSubscribe(Disposable d) {
                Log.e("TAG", "Disposable");

            public void onSuccess(String s) {
                Log.e("TAG", s);

            public void onError(Throwable e) {
                Log.e("TAG", e.toString());


    public void ClickFour(View view) {
        Completable.create(new CompletableOnSubscribe() {
            public void subscribe(CompletableEmitter emitter) throws Exception {
        }).subscribe(new CompletableObserver() {
            public void onSubscribe(Disposable d) {
                Log.e("TAG", "onSubscribe");

            public void onComplete() {
                Log.e("TAG", "onComplete");

            public void onError(Throwable e) {
                Log.e("TAG", "onError" + e.getMessage());


    public void ClickFive(View view) {
        Maybe.create(new MaybeOnSubscribe<String>() {
            public void subscribe(MaybeEmitter<String> emitter) throws Exception {
                emitter.onError(new Exception("异常测试"));
        }).subscribe(new MaybeObserver<String>() {
            public void onSubscribe(Disposable d) {
                Log.e("TAG", "onSubscribe");

            public void onSuccess(String s) {
                Log.e("TAG", "onSuccess" + s);

            public void onError(Throwable e) {
                Log.e("TAG", "onError" + e.getMessage());

            public void onComplete() {
                Log.e("TAG", "onComplete");


    .subscribeOn(Schedulers.newThread()) 指定数据源在新的线程里发射数据

    .observeOn(Schedulers.newThread()) 指定观察者在新的线程里接收数据

Schedulers.computation( )
当前线程中运行:无须指定 )
Schedulers.newThread( )
Schedulers.single( )
Schedulers.trampoline( )

简单地说,subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。
多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。


Creating Observables  创建
Transforming Observables 变换
Filtering Observables  过滤
Combining Observables 组合
Error Handling Operators 错误处理
Observable Utility Operators 工具
Conditional and Boolean Operators 条件
Mathematical and Aggregate Operators  聚合
Backpressure Operators 背压
Connectable Observable Operators 可连接
Operators to Convert Observables  转换

创建,变换,过滤,组合,统计,错误处理,背压处理 ,连接
1.创建 (Creating Observables)

Create — create an Observable from scratch by calling observer methods programmatically
Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
Empty/Never/Throw — create Observables that have very precise and limited behavior
From — convert some other object or data structure into an Observable
Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
Just — convert an object or a set of objects into an Observable that emits that or those objects
Range — create an Observable that emits a range of sequential integers
Repeat — create an Observable that emits a particular item or sequence of items repeatedly
Start — create an Observable that emits the return value of a function
Timer — create an Observable that emits a single item after a given delay


Observable.range(1, 10).subscribe(new Consumer<Integer>() {
    public void accept(Integer t) throws Exception {
Flowable.range(1, 10).subscribe(new Consumer<Integer>() {
    public void accept(Integer t) throws Exception {

2.变换 (Transforming Observables)

Buffer — periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time
FlatMap — transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
GroupBy — divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key
Map — transform the items emitted by an Observable by applying a function to each item
Scan — apply a function to each item emitted by an Observable, sequentially, and emit each successive value
Window — periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time


File folder = new File(FOLDER_PATH_P);
List<String> namesList = new ArrayList<>();
Observable.fromArray(folder.listFiles()).flatMap(new Function<File, ObservableSource<File>>() {
    public ObservableSource<File> apply(File t) throws Exception {
        return Observable.fromArray(t.listFiles());
}).map(new Function<File, String>{
    public String applay(File t) throws Exception {
        return t.getName();
}).subscribe(new Consumer<String>() {
    public void accept(String t) throws Exception {
        System.out.println("accept:" + t);

3.过滤 (Filtering Observables)

Debounce — only emit an item from an Observable if a particular timespan has passed without it emitting another item
Distinct — suppress duplicate items emitted by an Observable
ElementAt — emit only item n emitted by an Observable
Filter — emit only those items from an Observable that pass a predicate test
First — emit only the first item, or the first item that meets a condition, from an Observable
IgnoreElements — do not emit any items from an Observable but mirror its termination notification
Last — emit only the last item emitted by an Observable
Sample — emit the most recent item emitted by an Observable within periodic time intervals
Skip — suppress the first n items emitted by an Observable
SkipLast — suppress the last n items emitted by an Observable
Take — emit only the first n items emitted by an Observable
TakeLast — emit only the last n items emitted by an Observable


.filter(new Predicate<String>() {
    public boolean test(String t) throws Exception {
        return t.endsWith("png");
Observable.just(1, 2, 3, 4).take(2).subscribe(new Consumer<Integer>() {
    public void accept(Integer t) throws Exception {

4.组合 (Combining Observables)

And/Then/When — combine sets of items emitted by two or more Observables by means of Pattern and Plan intermediaries
CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
Merge — combine multiple Observables into one by merging their emissions
StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function


Observable<File> o1 = Observable.fromArray(folder1.listFiles());
Observable<File> o2 = Observable.fromArray(folder2.listFiles());
o1.mergeWith(o2).map(new Function<File, String>() {
    public String apply(File t) throws Exception {
        return t.getName();
}).subscribe(new Consumer<String>() {
    public void accept(String t) throws Exception {

5.错误处理 (Error Handling Operators)

Catch — recover from an onError notification by continuing the sequence without error
Retry — if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error


.subscribe(new Consumer<String>() {
    public void accept(String t) throws Exception {
        System.out.println("accept:" + t);
}, new Consumer<Throwable>() {
    public void accept(Throwable t) throws Exception {
        System.out.println("accept: error");
.retry(n) 出现异常的情况下,重试 n 次,  实际执行n+1次

6.工具 (Observable Utility Operators)

Delay — shift the emissions from an Observable forward in time by a particular amount
Do — register an action to take upon a variety of Observable lifecycle events
Materialize/Dematerialize — represent both the items emitted and the notifications sent as emitted items, or reverse this process
ObserveOn — specify the scheduler on which an observer will observe this Observable
Serialize — force an Observable to make serialized calls and to be well-behaved
Subscribe — operate upon the emissions and notifications from an Observable
SubscribeOn — specify the scheduler an Observable should use when it is subscribed to
TimeInterval — convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
Timeout — mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items
Timestamp — attach a timestamp to each item emitted by an Observable
Using — create a disposable resource that has the same lifespan as the Observable


int total= 6;
Observable<Long> observable = Observable.intervalRange(0, total, 0, 1, TimeUnit.SECONDS);

7.条件(Conditional and Boolean Operators)

All — determine whether all items emitted by an Observable meet some criteria
Amb — given two or more source Observables, emit all of the items from only the first of these Observables to emit an item
Contains — determine whether an Observable emits a particular item or not
DefaultIfEmpty — emit items from the source Observable, or a default item if the source Observable emits nothing
SequenceEqual — determine whether two Observables emit the same sequence of items
SkipUntil — discard items emitted by an Observable until a second Observable emits an item
SkipWhile — discard items emitted by an Observable until a specified condition becomes false
TakeUntil — discard items emitted by an Observable after a second Observable emits an item or terminates
TakeWhile — discard items emitted by an Observable after a specified condition becomes false


Observable<Integer> observable = Observable.just(1, 2, 3, 4);
observable.contains(2).subscribe(new BiConsumer<Boolean, Throwable>() {
    public void accept(Boolean t1, Throwable t2) throws Exception {
= true

Observable<Long> observable1 = Observable.interval(1, TimeUnit.SECONDS).take(4);
Observable<Integer> observable2 = Observable.just(1).delay(3, TimeUnit.SECONDS);
observable1.skipUntil(observable2).subscribe(new Consumer<Long>() {
    public void accept(Long t) throws Exception {
=  2  3

8.聚合(Mathematical and Aggregate Operators)

Average — calculates the average of numbers emitted by an Observable and emits this average
Concat — emit the emissions from two or more Observables without interleaving them
Count — count the number of items emitted by the source Observable and emit only this value
Max — determine, and emit, the maximum-valued item emitted by an Observable
Min — determine, and emit, the minimum-valued item emitted by an Observable
Reduce — apply a function to each item emitted by an Observable, sequentially, and emit the final value
Sum — calculate the sum of numbers emitted by an Observable and emit this sum


Observable.just(folder).flatMap(new Function<File, Observable<File>>() {
    public Observable<File> apply(File t) throws Exception {
        return Observable.fromArray(t.listFiles());
}).count().subscribe(new Consumer<Long>() {
    public void accept(Long t) throws Exception {
        System.out.println("count:" + t);

9.可连接 (Connectable Observable Operators)

Connect — instruct a connectable Observable to begin emitting items to its subscribers
Publish — convert an ordinary Observable into a connectable Observable
RefCount — make a Connectable Observable behave like an ordinary Observable
Replay — ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items


ConnectableObservable<Long> c1 = Observable.interval(1, TimeUnit.SECONDS).publish();
c1.subscribe(new Consumer<Long>() {
    public void accept(Long t) throws Exception {

11.转换(Operators to Convert Observables)

To — convert an Observable into another object or data structure


Observable.just(1, 2).toList().subscribe(new Consumer<List<Integer>>() {
    public void accept(List<Integer> t) throws Exception {
上一篇 下一篇

