组合Observable

2018-09-28  本文已影响60人  CyrusChan

这节说明你可以组合多个Obserable的操作符

目录:

startWith

Available in: Flowable,Observable
ReactiveX doumentation: http://reactivex.io/documentation/operators/startwith.html

发射一个特定的序列在开始从Observable发射item之前。

startWith例子:

Observable<String> names = Observable.just("Spock", "McCoy");

names.startWith("Kirk").subscribe(item -> System.out.println(item));

merge:

把多个Observable组合成一个

merge

Available in: Flowable,Observable,Maybe,Single,Completable

ReactiveX doumentation: http://reactivex.io/documentation/operators/merge.html

把多个Observable组合成一个。任何onError通知从任何原Observable传入的会立即被传给观察者且将会终止 合并的Observable。

Merge例子:

Observable.just(1, 2, 3)
    .mergeWith(Observable.just(4, 5, 6))
    .subscribe(item -> System.out.println(item));

// prints 1, 2, 3, 4, 5, 6

mergeDelayError

Available in: Flowable,Observable,Maybe,Single,Completable

ReactiveX doumentation: http://reactivex.io/documentation/operators/merge.html

组合多个Observable 成一个。任何从任一原Observable传入的onError通知会被保留直到所有合成的Observable 完成,且直到那时他们会被传给观察者。

mergeDelayError 例子

Observable<String> observable1 = Observable.error(new IllegalArgumentException(""));
Observable<String> observable2 = Observable.just("Four", "Five", "Six");
Observable.mergeDelayError(observable1, observable2)
        .subscribe(item -> System.out.println(item));

// emits 4, 5, 6 and then the IllegalArgumentException (in this specific
// example, this throws an `OnErrorNotImplementedException`).

zip

Available in:Flowable,Observable,Maybe,Single

ReactiveX doumentation: http://reactivex.io/documentation/operators/zip.html

通过一个特定的函数来组合一系列被2个以上Observable一起发射的item 并且发射基于这个函数的结果的item

zip Example

Observable<String> firstNames = Observable.just("James", "Jean-Luc", "Benjamin");
Observable<String> lastNames = Observable.just("Kirk", "Picard", "Sisko");
firstNames.zipWith(lastNames, (first, last) -> first + " " + last)
    .subscribe(item -> System.out.println(item));

// prints James Kirk, Jean-Luc Picard, Benjamin Sisko

combineLatest

Available in: Flowable,Observable

ReactiveX doumentation: http://reactivex.io/documentation/operators/combinelatest.html

当一个item被两者之一的Observable发射,通过一个特定的函数组合最后被每个Observable发射的item 并且发射基于该函数的结果的item

combineLatest Example

Observable<Long> newsRefreshes = Observable.interval(100, TimeUnit.MILLISECONDS);
Observable<Long> weatherRefreshes = Observable.interval(50, TimeUnit.MILLISECONDS);
Observable.combineLatest(newsRefreshes, weatherRefreshes,
    (newsRefreshTimes, weatherRefreshTimes) ->
        "Refreshed news " + newsRefreshTimes + " times and weather " + weatherRefreshTimes)
    .subscribe(item -> System.out.println(item));

// prints:
// Refreshed news 0 times and weather 0
// Refreshed news 0 times and weather 1
// Refreshed news 0 times and weather 2
// Refreshed news 1 times and weather 2
// Refreshed news 1 times and weather 3
// Refreshed news 1 times and weather 4
// Refreshed news 2 times and weather 4
// Refreshed news 2 times and weather 5
// ...

switchOnNext

Available in: Flowable,Observable

ReactiveX doumentation: http://reactivex.io/documentation/operators/switch.html

把一个发射 多个Observable的Observable转化成一个单一、发射被最近这些Observable发射的item的Observable

switchOnNext Example

Observable<Observable<String>> timeIntervals =
    Observable.interval(1, TimeUnit.SECONDS)
      .map(ticks -> Observable.interval(100, TimeUnit.MILLISECONDS)
                              .map(innerInterval -> "outer: " + ticks + " - inner: " + innerInterval));
Observable.switchOnNext(timeIntervals)
    .subscribe(item -> System.out.println(item));
            
// prints:
// outer: 0 - inner: 0
// outer: 0 - inner: 1
// outer: 0 - inner: 2
// outer: 0 - inner: 3
// outer: 0 - inner: 4
// outer: 0 - inner: 5
// outer: 0 - inner: 6
// outer: 0 - inner: 7
// outer: 0 - inner: 8
// outer: 1 - inner: 0
// outer: 1 - inner: 1
// outer: 1 - inner: 2
// outer: 1 - inner: 3
// ...

joins

join( )** and **groupJoin( ) 不论何时一个Observable的item落入到另一个Observable的item的特定的窗口期,组合被两个Observbale发射的item。

rxjava-joins

(rxjava-joins) — 说明这个Operator 目前是可选的rxjava-joins的一部分在rxjava-contrib 且不包括在标准的Rxjava操作符系列。

上一篇 下一篇

猜你喜欢

热点阅读