RxSwift - 操作符四

2022-07-07  本文已影响0人  aven_kang

sample

不定期的对 Observable 取样


smple

sample 操作符将不定期的对源 Observable 进行取样操作。 通过第二个 Observable 来控制取样时机。一旦第二个
Observable 发出一个元素,就从源 Observable 中取出最 后产生的元素。

        let source = PublishSubject<Int>()
        let notifier = PublishSubject<String>()
        source
        .sample(notifier)
        .subscribe(onNext: { print($0) })
        .disposed(by: bag)

        source.onNext(1)

        //让源序列接收接收消息
        notifier.onNext("A")

        source.onNext(2)

        //让源序列接收接收消息
        notifier.onNext("B")
        notifier.onNext("C")

        source.onNext(3)
        source.onNext(4)

        //让源序列接收接收消息
        notifier.onNext("D")

        source.onNext(5)

        //让源序列接收接收消息
        notifier.onCompleted()

输出结果

1
2
4
5

分析:从输出结果来看,我们会发现3不见了,为什么不见了呢,我们回到代码notifier.onNext("D"),在这里,发出D之后呢,我们回到source源上,发现最新的是4,并不是3,也就是3会被丢弃了,也就是说,哪怕中间的再多的事件,但是我也是根据监听事件取源的最新一个事件

scan

持续的将 Observable 的每一个元素应用一个函数,然后发 出每一次函数返回的结果


scan

scan 操作符将对第一个元素应用一个函数,将结果作为第一 个元素发出。然后,将结果作为参数填入到第二个元素的应 用函数中,创建第二个元素。以此类推,直到遍历完全部的 元素。 这种操作符在其他地方有时候被称作是 accumulator 。

        /// scan
       Observable.of(1,2,3,4).scan(0) { a, b in
            return a+b
        }.subscribe { element in
            print(element)
        }.disposed(by: bag)
        // 输出结果
        // next(1)
        // next(3)
        // next(6)
        // next(10)
        // completed

        /// reduce
        Observable.of(1,2,3,4).reduce(0) { a, b in
            return a+b
        }.subscribe { element in
            print(element)
        }.disposed(by: bag)
        // 输出结果
        // next(10)
        // completed

shareReplay

使观察者共享 Observable ,观察者会立即收到最新的元 素,即使这些元素是在订阅前产生的
shar eReplay 操作符将使得观察者共享源 Observable ,并且 缓存最新的 n 个元素,将这些元素直接发送给新的观察者。

        let seq = PublishSubject<Int>()
           
        let ob = seq.map { (i) -> Int in
            print("map 被调用 :---\(i)")
            return i * 2
        }

        let _ = ob.subscribe(onNext: { (num) in
            print("--第一次订阅--\(num)")
        }, onError: nil, onCompleted: nil, onDisposed: nil)


        let _ = ob.subscribe(onNext: { (num) in
            print("--第二次订阅--\(num)")
        }, onError: nil, onCompleted: nil, onDisposed: nil)

        seq.onNext(1)

输出结果

map 被调用 :---1
--第一次订阅--2
map 被调用 :---1
--第二次订阅--2

这里可以看出来map被调用了两次,我们再看看加了share的情况

 let seq = PublishSubject<Int>()
           
        let ob = seq.map { (i) -> Int in
            print("map 被调用 :---\(i)")
            return i * 2
        }
            .share(replay: 0, scope: .forever)

        let _ = ob.subscribe(onNext: { (num) in
            print("--第一次订阅--\(num)")
        }, onError: nil, onCompleted: nil, onDisposed: nil)


        let _ = ob.subscribe(onNext: { (num) in
            print("--第二次订阅--\(num)")
        }, onError: nil, onCompleted: nil, onDisposed: nil)

        seq.onNext(1)
输出结果
map 被调用 :---1
--第一次订阅--2
--第二次订阅--2

总结:
从上面例子可以知道:
发送一次onNext事件,就对应一个Observable,而所有的观察者都只对应这个Observable,Observable共享给所有的观察者
share(replay:scope:)操作符使得观察者共享Observable,并且缓存最新的n个元素,将这些元素直接发送给新的观察者

用例2

无share

        let net = Observable<String>.create { observe in
            
            print("开始网络请求")
            observe.onNext("Response__Result")
            observe.onCompleted()
            return Disposables.create()
            
        }
        
        net.subscribe { element in
            print("第一次订阅\(element)",Thread.current)
        }.disposed(by: bag)
        
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
            net.subscribe { element in
                print("第二次订阅\(element)",Thread.current)
            }.disposed(by: self.bag)
        }

输出结果

开始网络请求
第一次订阅next(Response__Result) <_NSMainThread: 0x6000028dc700>{number = 1, name = main}
第一次订阅completed <_NSMainThread: 0x6000028dc700>{number = 1, name = main}
开始网络请求
第二次订阅next(Response__Result) <_NSMainThread: 0x6000028dc700>{number = 1, name = main}
第二次订阅completed <_NSMainThread: 0x6000028dc700>{number = 1, name = main}

分析:从打印结果来看,没用share的,订阅了两次,就触发了两次next事件

有share

       let net = Observable<String>.create { observe in
           
           print("开始网络请求")
           observe.onNext("Response__Result")
           observe.onCompleted()
           return Disposables.create()
           
       }.share(replay: 0, scope: .forever)
       
       net.subscribe { element in
           print("第一次订阅\(element)",Thread.current)
       }.disposed(by: bag)
       
       DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
           net.subscribe { element in
               print("第二次订阅\(element)",Thread.current)
           }.disposed(by: self.bag)
       }

输出结果

开始网络请求
第一次订阅next(Response__Result) <_NSMainThread: 0x600001dd4740>{number = 1, name = main}
第一次订阅completed <_NSMainThread: 0x600001dd4740>{number = 1, name = main}
第二次订阅completed <_NSMainThread: 0x600001dd4740>{number = 1, name = main}

分析:从打印结果来看,只执行了一次Next事件,相当于多个观察者共享一个Observable,也只响应这个Observable

single

限制 Observable 只有一个元素,否出发出一个 error事件
single 操作符将限制 Observable 只产生一个元素。如果Observable 只有一个元素,它将镜像这个 Observable 。 如果 Observable 没有元素或者元素数量大于一,它将产生 一个 error 事件。

       let ob = Observable<String>.create { observe in
            
            observe.onNext("1")
            observe.onNext("2")
            
            return Disposables.create()
        }.single()
            
    
        ob.subscribe { element in
            print(element)
        }.disposed(by: bag)
            
        ob.subscribe { element in
            print(element)
        }.disposed(by: bag)

输出结果

next(1)
error(Sequence contains more than one element.)
next(1)
error(Sequence contains more than one element.)

分析:从上述代码可以看出来,我们发出了两个onNext事件,但是后面的订阅者中可以看出打印结果,第一次是正常发出事件,当第二次还存在事件的时候,就强制发出了error,结束了该序列

skip

跳过 Observable 中头 n 个元素

            Observable.of(1,2,3,4,5,6,7)
            .skip(2)
            .subscribe { element in
                switch element {
                case .next(let index):
                    print(index)
                default:
                    break
                }
            }
            .disposed(by: bag)

输出结果

3
4
5
6
7

skipUntil

        let sourceSequence = PublishSubject<String>()
        let referenceSequence = PublishSubject<String>()
        
        sourceSequence.skip(until: referenceSequence)
            .subscribe { element in
                print(element)
            }.disposed(by: bag)
        
        sourceSequence.onNext("1")
        sourceSequence.onNext("2")
        sourceSequence.onNext("3")
        referenceSequence.onNext("4")
//
        sourceSequence.onNext("5")
        sourceSequence.onNext("6")
        sourceSequence.onNext("7")

输出结果

next(5)
next(6)
next(7)

总结:这里有一个注意点,如果你不执行referenceSequence.onNext("4"),那么sourceSequence所有的onNext事件将都不会响应,也就是不会有任何输出结果

skipWhile

            Observable<Int>.of(1,2,5,3,2,4,3,2,1)
            .skip { element in
                return element < 4
            }
            .subscribe { element in
                print(element)
            }
            .disposed(by: bag)

输出结果

next(5)
next(3)
next(2)
next(4)
next(3)
next(2)
next(1)
completed

分析:程序中,我们设定了element < 4的条件,第一个元素是1,小于4,成立,第二个元素是2,小于4程序,第三个是5,并不小于4,返回false,而skipWhile的功能就是,直到遇到第一个false的条件,那么前面所有的元素都将被丢弃,从打印结果来看,开头的1,2是没有打印的,便可印证

startWith

将一些元素插入到序列的头部
startWith操作符会在 Observable 头部插入一些元素。 (如果你想在尾部加入一些元素可以用concat

        Observable<Int>.of(1,2,3,4)
            .startWith(6,5)
            .startWith(7)
            .startWith(8)
            .subscribe { element in
                print(element)
            }
            .disposed(by: bag)

输出结果

next(8)
next(7)
next(6)
next(5)
next(1)
next(2)
next(3)
next(4)
completed

subscribeOn

指定 Observable 在那个 Scheduler 执行
ReactiveX 使用 Scheduler 来让 Observable 支持多线程。你可以使用 subscribeOn操作符,来指示 Observable 在哪个Scheduler 执行。
observeOn操作符非常相似。它指示 Observable 在哪个Scheduler 发出通知。

 PublishSubject<Int>.of(1,2,3).subscribe(on: MainScheduler.instance)
            .subscribe { element in
                print(Thread.current)
                print(element)
            }.disposed(by: bag)

输出结果

<_NSMainThread: 0x6000030a4700>{number = 1, name = main}
next(1)
<_NSMainThread: 0x6000030a4700>{number = 1, name = main}
next(2)
<_NSMainThread: 0x6000030a4700>{number = 1, name = main}
next(3)
<_NSMainThread: 0x6000030a4700>{number = 1, name = main}
completed

take

仅仅从 Observable 中发出头 n 个元素
通过 take 操作符你可以只发出头 n 个元素。并且忽略掉后面的元素,直接结束序列。

        PublishSubject<Int>.of(1,2,3,4,5).subscribe(on: MainScheduler.instance)
            .take(2)
            .subscribe { element in
                print(Thread.current)
                print(element)
            }.disposed(by: bag)

输出结果

<_NSMainThread: 0x6000027d0900>{number = 1, name = main}
next(1)
<_NSMainThread: 0x6000027d0900>{number = 1, name = main}
next(2)
<_NSMainThread: 0x6000027d0900>{number = 1, name = main}
completed

takeLast

仅仅从 Observable 中发出尾部 n 个元素
通过 takeLast 操作符你可以只发出尾部 n 个元素。并且忽略 掉前面的元素。

 PublishSubject<Int>.of(1,2,3,4,5).subscribe(on: MainScheduler.instance)
            .takeLast(2)
            .subscribe { element in
                print(Thread.current)
                print(element)
            }.disposed(by: bag)

输出结果

<_NSMainThread: 0x6000025dc440>{number = 1, name = main}
next(4)
<_NSMainThread: 0x6000025dc440>{number = 1, name = main}
next(5)
<_NSMainThread: 0x6000025dc440>{number = 1, name = main}
completed

takeUntil

忽略掉在第二个 Observable 产生事件后发出的那部分元素
takeUntil 操作符将镜像源 Observable ,它同时观测第二个
Observable 。一旦第二个 Observable 发出一个元素或者 产生一个终止事件,那个镜像的 Observable 将立即终止。

        let soucecSequence = PublishSubject<Int>()
        let referenceSequence = PublishSubject<Int>()
        
        soucecSequence.take(until: referenceSequence)
            .subscribe { element in
                print(element)
            }
        
        soucecSequence.onNext(1)
        soucecSequence.onNext(2)
        soucecSequence.onNext(3)
        
        referenceSequence.onNext(4)
        
        soucecSequence.onNext(5)
        soucecSequence.onNext(6)
        soucecSequence.onNext(7)

输出结果

next(1)
next(2)
next(3)
completed

takeWhile

镜像一个 Observable 直到某个元素的判定为 false
takeWhile 操作符将镜像源 Observable 直到某个元素的判 定为 false。此时,这个镜像的 Observable 将立即终止。

        PublishSubject<Int>.of(1,2,3,4,5,6,7)
            .take(while: { index in
                return index < 4
            })
            .subscribe {
                print($0)
            }.disposed(by: bag)

输出结果

next(1)
next(2)
next(3)
completed

timeout

设置超时时间,超过规定时间的事件将发送 error

        let pub = PublishSubject<Int>()
        pub.timeout(RxTimeInterval.seconds(3), scheduler: MainScheduler.instance).subscribe { element in
            print(element)
        }.disposed(by: bag)
        
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 4) {
            pub.onNext(3)
        }

timer

创建一个 Observable 在一段延时后,产生唯一的一个元素
timer 操作符将创建一个 Observable ,它在经过设定的一段 时间后,产生唯一的一个元素。

public static func timer( _ dueTime: RxTimeInterval, // 初始延时
period: RxTimeInterval?, // 时间间隔
scheduler: SchedulerType
) -> Observable<E>

 PublishSubject<Int>
            .timer(RxTimeInterval.seconds(2), period: RxTimeInterval.seconds(3), scheduler: MainScheduler.instance)
            .subscribe { element in
            print(element)
        }.disposed(by: bag)
上一篇下一篇

猜你喜欢

热点阅读