RxSwift

RxSwift - 操作符三

2022-07-08  本文已影响0人  aven_kang

just

创建 Observable 发出唯一的一个元素


截屏2022-07-07 下午2.31.15.png

just 操作符将某一个元素转换为 Observable 。

用例

        let ob1 = Observable.just(1)
            ob1.subscribe { index in
            print(index)
         }.disposed(by: bag)
        
    /// ob1与ob2是同等效果的

        let ob2 = Observable<Int>.create { obser in
            
            obser.onNext(1)
            obser.onCompleted()
            return Disposables.create()
        }
        
        ob2.subscribe { element in
           
            switch element {
            case .next(let index):
                print(index)
            case .completed:
                print("completed")
            default:
                break
            }
        }.disposed(by: bag)

map

通过一个转换函数,将 Observable 的每个元素转换一遍


截屏2022-07-07 下午2.43.25.png

map 操作符将源 Observable 的每个元素应用你提供的转换 方法,然后返回含有转换结果的 Observable 。

            Observable.of(1,2,3)
            .map { index in
                return index * 10
            }
            .subscribe { index in
                print(index)
            }
            .disposed(by: bag)

merge

将多个 Observables 合并成一个


merge

通过使用 merge 操作符你可以将多个 Observables 合并成 一个,当某一个Observable 发出一个元素时,他就将这个 元素发出。 如果,某一个 Observable 发出一个 onError 事件,那么 被合并的 Observable 也会将它发出,并且立即终止序列。

        let sub1 = PublishSubject<Int>()
        let sub2 = PublishSubject<Int>()
        
        Observable.of(sub1,sub2)
            .merge()
            .subscribe { index in
                print(index)
            }
            .disposed(by: bag)
        
        sub1.onNext(1)
        sub1.onNext(3)
        sub2.onNext(5)
        sub1.onNext(7)
        sub2.onNext(9)
        sub2.onError(RxError.error1)
        sub1.onNext(11)

输出结果

next(1)
next(3)
next(5)
next(7)
next(9)
error(error1)

materialize

将序列产生的事件,转换成元素


截屏2022-07-07 下午2.54.19.png

通常,一个有限的 Observable 将产生零个或者多个onNext 事件,然后产生一个 onCompleted 或者 onError事件。
materialize 操作符将 Observable 产生的这些事件全部转换 成元素,然后发送出来。

用例

        Observable.of(1,2,3)
            .materialize()
            .subscribe { element in
                print(element)
            }
            .disposed(by: bag)

输出结果

next(next(1))
next(next(2))
next(next(3))
next(completed)
completed

可以看出来next(1)被包裹在next里面了,把事件当成元素发出来,那么如何取到里面的元素呢,看下面的代码

        Observable.of(1,2,3)
            .materialize()
            .subscribe { element in
                switch event {
                case .next(let element):
                    print(element)
                default:
                    break
                }
            }
            .disposed(by: bag)

输出结果

next(1)
next(2)
next(3)
completed

never

创建一个永远不会发出元素的 Observable


never

never 操作符将创建一个 Observable ,这个 Observable不会产生任何事件。

        let ob = Observable<Int>.never()
        /// ob与ob1的效果是一样
        let ob1 = Observable<Int>.create { obser in
            return Disposables.create()
        }

observeOn

指定 Observable 在那个 Scheduler 发出通知


observeOn

ReactiveX 使用 Scheduler 来让 Observable 支持多线程。你 可以使用 observeOn 操作符,来指示 Observable 在哪个Scheduler 发出通知。

let ob1 = Observable<Int>.create { obser in
            obser.onNext(1)
            return Disposables.create()
        }
            .observe(on: ConcurrentDispatchQueueScheduler(qos: .background))
            
        ob1.subscribe { element in
            print(element)
            print("current thread:",Thread.current)
        }.disposed(by: bag)
截屏2022-07-07 下午3.31.41.png

注意⚠ :一旦产生了 onError 事件, observeOn 操作符将 立即转发。他不会等待 onError 之前的事件全部被收到。 这意味着 onError 事件可能会跳过一些元素提前发送出 去,如上图所示。

publish

将 Observable 转换为可被连接的 Observable
publish 会将 Observable 转换为可被连接的Observable 。可被连接的 Observable 和普通的Observable 十分相似,不过在被订阅后不会发出元素,直 到 connect 操作符被应用为止。这样一来你可以控制Observable 在什么时候开始发出元素。

let intSequence = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
            .publish()
        
        _ = intSequence.subscribe(onNext: { element in
            print("subscribe1:",element)
        })
        
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            _ = intSequence.connect()
        }
        
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 4) {
            _ = intSequence.subscribe({ element in
                print("subscribe2:",element)
            })
        }
        
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 6) {
            _ = intSequence.subscribe({ element in
                print("subscribe3:",element)
            })
        }
subscribe1: 0
subscribe1: 1
subscribe2: next(1)
subscribe1: 2
subscribe2: next(2)
subscribe1: 3
subscribe2: next(3)
subscribe3: next(3)
subscribe1: 4
subscribe2: next(4)
subscribe3: next(4)

reduce

持续的将 Observable 的每一个元素应用一个函数,然后发 出最终结果


reduce

reduce 操作符将对第一个元素应用一个函数。然后,将结果 作为参数填入到第二个元素的应用函数中。以此类推,直到 遍历完全部的元素后发出最终结果。 这种操作符在其他地方有时候被称作是 accumulator ,aggregate,compress,fold 或者 inject 。

Observable.of(1,2,3)
            .reduce(0) { a, b in
                return a+b
            }.subscribe { index in
                print(index)
            }
            .disposed(by: bag)

输出结果

next(6)
completed

refCount

将可被连接的 Observable 转换为普通 Observable refCount
        let intSequence = Observable<Int>.interval(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance)
                            .publish().refCount()
        
        let firstSubscribe = intSequence.subscribe { element in
            print(element)
        }
    
        
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 3) {
            print("延迟3秒后")
            firstSubscribe.dispose()
        }
        
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 6) {
            print("延迟6秒后")
            intSequence.subscribe { element in
                print("Subscribe2:",element)
            }.disposed(by: self.bag)
        }

输出结果

next(0)
next(1)
next(2)
延迟3秒后
延迟6秒后
Subscribe2: next(0)
Subscribe2: next(1)
Subscribe2: next(2)
Subscribe2: next(3)
Subscribe2: next(4)
Subscribe2: next(5)

当第一个订阅者订阅这个观察者序列,refcount()会自动调用connect()方法,不需要我们手动调用。RefCount会跟踪有多少订阅者订阅了,并不会断开连接一直到最后一个观察者序列处理完成。
分析:从上面代码中可以看到,我们在延迟三秒后执行firstSubscribe的dispose()方法,然后6秒后,又重新订阅了,那么可以看到打印,是从0开始的,是不会衔接上面的继续打印,如果不加 refCount()的话,会接着打印next(6)....next(x),可以对比如下代码

let intSequence = Observable<Int>.interval(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance)
                            .publish()
        _ = intSequence.connect()
        
        let firstSubscribe = intSequence.subscribe { element in
            print(element)
        }
    
        
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 3) {
            print("延迟3秒后")
            firstSubscribe.dispose()
        }
        
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 6) {
            print("延迟6秒后")
            intSequence.subscribe { element in
                print("Subscribe2:",element)
            }.disposed(by: self.bag)
        }

输出结果

next(0)
next(1)
next(2)
延迟3秒后
延迟6秒后
Subscribe2: next(6)
Subscribe2: next(7)
Subscribe2: next(8)
Subscribe2: next(9)
Subscribe2: next(10)

repeatElement

创建重复发出某个元素的 Observable


repeatElement

repeatElement 操作符将创建一个 Observable ,这个Observable 将无止尽地发出同一个元素。

       Observable<Int>.repeatElement(30)
            .subscribe { element in
                print(element)
            }
            .disposed(by: bag)
        // 这两个写法是类似的
        let intSequence = Observable<Int>.create { observe in
            
            observe.onNext(1)
            observe.onNext(1)
            // .. 无数次
            return Disposables.create()
        }

replay

确保观察者接收到同样的序列,即使是在 Observable 发出 元素后才订阅

let IntSequence = Observable<Int>.interval(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance)
            .replay(3)
        
        IntSequence.subscribe { element in
            print("Subscribe1:",element)
        }.disposed(by: bag)
        
        
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
            _ = IntSequence.connect()
        }
        
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 4) {
            IntSequence.subscribe { element in
                print("Subscribe2:",element)
            }.disposed(by: self.bag)
        }
        
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 10) {
            IntSequence.subscribe { element in
                print("Subscribe3:",element)
            }.disposed(by: self.bag)
        }

输出结果

Subscribe1: next(0)
Subscribe2: next(0)
Subscribe1: next(1)
Subscribe2: next(1)
Subscribe1: next(2)
Subscribe2: next(2)
Subscribe1: next(3)
Subscribe2: next(3)
Subscribe1: next(4)
Subscribe2: next(4)
Subscribe1: next(5)
Subscribe2: next(5)
Subscribe1: next(6)
Subscribe2: next(6)
Subscribe3: next(4)
Subscribe3: next(5)
Subscribe3: next(6)
Subscribe1: next(7)
Subscribe2: next(7)
Subscribe3: next(7)

retry

如果源 Observable 产生一个错误事件,重新对它进行订 阅,希望它不会再次产生错误
retry 操作符将不会将 error 事件,传递给观察者,然而, 它会从新订阅源 Observable ,给这个 Observable 一个重 试的机会,让它有机会不产生 error 事件。retry 总是对观 察者发出 next 事件,即便源序列产生了一个 error 事 件,所以这样可能会产生重复的元素

        var count = 1
        let IntSequence = Observable<Int>.create { observe in
            
            observe.onNext(1)
            observe.onNext(2)
            observe.onNext(3)
            if count == 1 {
                observe.onError(RxError.error1)
                print("发出错误")
                count += 1
            }
            observe.onNext(4)
            observe.onNext(5)
            observe.onNext(6)
            
            return Disposables.create()
        }
        
        IntSequence.retry()
            .subscribe { element in
                print(element)
            }.disposed(by: bag)

输出结果

next(1)
next(2)
next(3)
发出错误
next(1)
next(2)
next(3)
next(4)
next(5)
next(6)

注意:如果你不把count的值加1或者进行其他操作,否则会循环打印

next(1)
next(2)
next(3)
发出错误
上一篇下一篇

猜你喜欢

热点阅读