RxJava操作符-组合

2016-04-07  本文已影响692人  木月摇江

startWith

public final Observable<T> startWith(Observable<T> values)
public final Observable<T> startWith(java.lang.Iterable<T> values)
public final Observable<T> startWith(T t1)
......
public final Observable<T> startWith(T t1, T t2, T t3,  T t4,  T t5,  T t6,  T t7,  T t8,  T t9)

接受一个Observable、Iterable实例或者1到9个值作为参数。在释放原始的Observable释放的序列前先释放传入的Observable实例释放的序列、Iterable实例的元素或者按传入顺序释放传入的1到9个值。

merge,mergeWith

public static <T> Observable<T> merge(java.lang.Iterable<? extends Observable<? extends T>> sequences)
public static <T> Observable<T> merge(java.lang.Iterable<? extends Observable<? extends T>> sequences,  int maxConcurrent)
public static <T> Observable<T> merge(Observable<? extends T>[] sequences)
public static <T> Observable<T> merge(Observable<? extends T>[] sequences,  int maxConcurrent)
public static <T> Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2)
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source)
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent)
......
public static <T> Observable<T> merge(Observable<? extends T> t1,
                      Observable<? extends T> t2,
                      Observable<? extends T> t3,
                      Observable<? extends T> t4,
                      Observable<? extends T> t5,
                      Observable<? extends T> t6,
                      Observable<? extends T> t7,
                      Observable<? extends T> t8,
                      Observable<? extends T> t9)

简单来说就是把一大堆的Observable释放的序列合并到一个Observerble来释放。所以可以看到,ta接受的参数可以是2到9个的Observable实例、泛型为Observable的Iterable实例、类型为Observable的数组或者释放的序列类型为Observable的Observable实例。注意,ta还可以接受一个int类型的参数,用来限制每次订阅合并的Observable的最大个数,当要合并的Observable数大于这个值,多出来的Observable暂时不会合并,直到已经合并的Observable中有终止了的(发出onComplted通知),再继续合并未合并的Observable。注意,当这些要合并的Observable中有发onError通知终止的,那么最终合并得来的那个Observable也将立即发出onError通知终止。还有一个小点就是Observable实例有个方法mergeWith用来将这个Observable实例和其taObservable实例合并。

mergeDelayError

public static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source)
public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Observable<? extends T> t2)
......
public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1,
                                Observable<? extends T> t2,
                                Observable<? extends T> t3,
                                Observable<? extends T> t4,
                                Observable<? extends T> t5,
                                Observable<? extends T> t6,
                                Observable<? extends T> t7,
                                Observable<? extends T> t8,
                                Observable<? extends T> t9)

基本和上面merge一样,不过对于错误处理上稍强一些。当被合并的一堆Observable中有发出onError终止的,最终那个合并得来的Observable不会立即终止,而是继续释放其ta没有终止的被合并的Observable发出的序列,最后才发出onError终止。因为可能不止一个被合并的Observable会发出onError,因此会有可能会有多种错误类型,最终在订阅的Observer的onError方法中会收到CompositeException类型的异常。注意,最终合并得来的Observable最多只会调用一次onError方法。

zip,zipWith

public static <R> Observable<R> zip(java.lang.Iterable<? extends Observable<?>> ws,  FuncN<? extends R> zipFunction)
public static <R> Observable<R> zip(Observable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)
public static <T1,T2,R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1,? super T2,? extends R> zipFunction)
......
public static <T1,T2,T3,T4,T5,T6,T7,T8,T9,R> Observable<R> zip(Observable<? extends T1> o1,
                                               Observable<? extends T2> o2,
                                               Observable<? extends T3> o3,
                                               Observable<? extends T4> o4,
                                               Observable<? extends T5> o5,
                                               Observable<? extends T6> o6,
                                               Observable<? extends T7> o7,
                                               Observable<? extends T8> o8,
                                               Observable<? extends T9> o9,
                                               Func9<? super T1,? super T2,? super T3,? super T4,? super T5,? super T6,? super T7,? super T8,? super T9,? extends R> zipFunction)

#Observable实例的方法(zipWith):
public final <T2,R> Observable<R> zipWith(Observable<? extends T2> other, Func2<? super T,? super T2,? extends R> zipFunction)
public final <T2,R> Observable<R> zipWith(java.lang.Iterable<? extends T2> other, Func2<? super T,? super T2,? extends R> zipFunction)

接受两个参数,第一个提供一堆Observable,第二个是一个FuncN实现。与上面的操作符不同,这个操作符通过第二个参数FuncN实现来合并一堆Observable发出的序列的每个item。并且这样的合并有着严格的次序,具体说来就是,最终合并得来的Observable发出序列的第一个item是每个要合并的Observable发出序列的第一个item传入FuncN实现得到的结果,然后依次类推。所以最终合并得来的Observable能发出的item个数和那一堆被合并的Observable中能发出item个数最少的items数量相等。zipWith不过是Observable实例的方法,与zip同理,注意一下zipWith(Iterable,Func2)的第一个参数,不是Observable而是Iterable,但是不用说也能心领神会我想说什么了吧。

combineLatest

public static <T,R> Observable<R> combineLatest(java.util.List<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction)
public static <T1,T2,R> Observable<R> combineLatest(Observable<? extends T1> o1,  Observable<? extends T2> o2, Func2<? super T1,? super T2,? extends R> combineFunction)
......
public static <T1,T2,T3,T4,T5,T6,T7,T8,T9,R> Observable<R> combineLatest(Observable<? extends T1> o1,
                                                         Observable<? extends T2> o2,
                                                         Observable<? extends T3> o3,
                                                         Observable<? extends T4> o4,
                                                         Observable<? extends T5> o5,
                                                         Observable<? extends T6> o6,
                                                         Observable<? extends T7> o7,
                                                         Observable<? extends T8> o8,
                                                         Observable<? extends T9> o9,
                                                         Func9<? super T1,? super T2,? super T3,? super T4,? super T5,? super T6,? super T7,? super T8,? super T9,? extends R> combineFunction)

和zip操作符很像,但和zip不同的是,zip操作符是按严格次序来合并一堆源Observable释放序列的每个item而作为创建的新Observable释放序列的item的,也就是说新的Observale释放序列的第一个item是所有源Observable释放序列的第一个item通过zip的第二个参数FuncN实现合并而成的,之后的第二、第三等等以此类推。combineLatest通过FuncN实现来合并每个源Observable最近释放的item来构成新建的Observable释放序列的item的。

join、groupJoin

public final <TRight,TLeftDuration,TRightDuration,R> Observable<R> join(Observable<TRight> right, Func1<T,Observable<TLeftDuration>> leftDurationSelector, Func1<TRight,Observable<TRightDuration>> rightDurationSelector, Func2<T,TRight,R> resultSelector)

public final <T2,D1,D2,R> Observable<R> groupJoin(Observable<T2> right, Func1<? super T,? extends Observable<D1>> leftDuration, Func1<? super T2,? extends Observable<D2>> rightDuration,  Func2<? super T,? super Observable<T2>,? extends R> resultSelector)

注意这两个家伙是Observable实例的方法。理解这两个方法的参数很关键,前三个参数都是一样的,第四个参数有些区别。第一个参数是要和源Observable合并的Observable;第二个参数是一个Func1实现接受一个参数就是源Observable释放的item,返回一个Observable管理一个时间窗口;第三个参数也是一个Func1实现接受一个参数就是要合并的Observable释放的item,同样返回一个Observable管理着一个时间窗口;如果上面两个Observable管理的时间窗口有交集,那么第四个参数也就是一个Func2实现就会起来发挥合并的作用了,Func2实现接受两个参数,对于join方法,前两个参数是处在时间窗口交集中源Observable和要合并的Observable释放的item,而对于groupJoin第一个参数是处在时间窗口交集中源Observable释放的item,第二个参数是处在时间窗口交集中由要合并的Observable释放的item经groupJoin的第三个参数Func1实现返回的管理时间窗口的Observable。这两个方法新建的Observable释放的item就是上面Func2实现的返回值了。

switchOnNext

public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? extends T>> sequenceOfSequences)

这个简单一点。接受一个释放Observable的Observable作为参数。switchOnNext观察这个释放Observable的Observable,当有Observable释放出来的时候switchOnNext新建的Observable就释放这个Observable释放的item,当有新的Observable释放出来的时候switchOnNext停止释放上一个Observable释放的item转而释放这个新释放的Observable释放的item,依次类推。(-:这句子,实在前无古人后无来者,太起伏了,太跌宕了-:)

上一篇下一篇

猜你喜欢

热点阅读