Swift swift学习RxSwift

RxSwift常用基础知识

2022-03-14  本文已影响0人  MrDarren

目的

gitHub地址: https://github.com/ReactiveX/RxSwift

RxSwift的目的是让数据/事件流和异步任务能够更方便的序列化处理, 能够使用Swift进行响应式编程。

RxSwift做了什么

RxSwift把我们程序中每一个操作都看成一个事件,比如一个TextField中的文本改变,一个按钮被点击,或者一个网络请求结束等,每一个事件源就可以看成一个管道,也就是sequence,比如TextField,当我们改变里面的文本的时候,这个TextField就会不断的发出事件,从他的这个sequence中不断的流出,我们只需要监听这个sequence,每流出一个事件就做相应的处理。同理,Button也是一个sequence,每点击一次就流出一个事件。

RxSwift的核心思想是 Observable

sequence,observable表示可监听或者可观察,也就是说RxSwift的核心思想是可监听的序列。并且Observable sequence可以接受异步信号,也就是说,信号是可以异步给监听者的,Observable(ObservableType) 和 SequenceType类似,ObservableType.subscribe 和 SequenceType.generate类似,由于RxSwift支持异步获得信号,所以用ObservableType.subscribe,这和indexGenerator.next()类似其中SequenceType是Swift(2.3以前版本,之后的版本没有该协议)中的一个协议,比如Swift中的Array就遵循这个协议,通过这个协议,你可以这样的去操作一个Array

let array = [1, 2, 3, 4, 5]

let array2 = array.filter({$0 > 1}).map({$0 * 2})//4 6 8 10

var indexGenerator = array2.generate()

let fisrt = indexGenerator.next() // 4

let seoncd = indexGenerator.next() //6

RxSwift中,ObservableType.subscribe的回调(新的信号到来)一共有三个

enum Event<Element> {
    
    case Next(Element) // 新的信号到来
    
    case Error(ErrorType) // 信号发生错误,序列不会再产生信号
    
    case Completed // 序列发送信号完成,不会再产生新的信号
    
}

Observable分为两种

1 在有限的时间内会自动结束(Completed/Error),比如一个网络请求当作一个序列,当网络请求完成的时候,Observable自动结束,资源会被释放。
2 信号不会自己结束,最简单的比如一个Timer,每隔一段时间发送一个新的信号过来,这时候需要手动取消监听,来释放相应的资源,比如一个label.rx.text是一个Obserable,通常需要这样调用addDisposableTo(disposeBag)来让其在deinit,也就是所有者要释放的时候,自动取消监听。

class Observable<Element> {
    func subscribe(observer: Observer<Element>) -> Disposable //调用Disposable的方法来取消
}

当然,除了手动释放,RxSwift提供了一些操作符,比如 takeUntil来根据条件取消

sequence .takeUntil(self.rx__deallocated) //当对象要释放的时候,取消监听 .subscribe { print($0) }

Operator运算符相关

never

never就是创建一个sequence,但是不发出任何事件信号。

Observable<String>.never().subscribe { _ in
     print("This will never be printed")
}.disposed(by: disposeBag)
empty

empty就是创建一个空的sequence,只能发出一个complected事件。

Observable<Int>.empty().subscribe { event in
     let text = "operator: empty__\(event)"
     self.showText(text: text)
     print(text)
}.disposed(by: disposeBag)
create

我们也可以自定义可观察的sequence,那就是使用create。
create操作符传入一个观察者observer,然后调用observer的onNext,onCompleted和onError方法。返回一个可观察的obserable序列。
无参创建create

Observable<Any>.create { (observal:AnyObserver<Any>) -> Disposable in
    observal.onNext("abc")
    observal.onNext("12")
    observal.onCompleted()
    return Disposables.create()
}.subscribe(onNext: { str in
    let text = "operator: create__\(str)"
    self.showText(text: text)
    print(text)
}).disposed(by: disposeBag)

添加参数创建create

func createObservable(element:String) -> Observable<String> {
    return Observable.create { (observal:AnyObserver<String>) -> Disposable in
                observal.onNext(element)
                observal.onCompleted()
           return Disposables.create()
   }
}

createObservable(element: "element").subscribe(onNext: { str in
       let text = "operator: create with element__\(str)"
       self.showText(text: text)
       print(text)
}).disposed(by: disposeBag)
deferred

延时创建Observable对象,当subscribe的时候才去创建,它为每一个Observer创建一个新的Observable,也就是说每个订阅者订阅的对象都是内容相同但是完全独立的序列。
deferr采用一个Factory函数型作为参数,Factory函数返回的是Observable类型。这也是其延时创建Observable的主要实现。

let defObservable = Observable<String>.deferred { () -> Observable<String> in
       return Observable.create { observer -> Disposable in
                   observer.onNext("defObservable create")
                   observer.onCompleted()
              return Disposables.create()
       }
}
defObservable.subscribe { event in
       let text = "operator: deferred one__\(event)"
       self.showText(text: text)
       print(text)
}.disposed(by: disposeBag)
defObservable.subscribe { event in
       let text = "operator: deferred two__\(event)"
       self.showText(text: text)
       print(text)
}.disposed(by: disposeBag)

/// deferred 延迟创建的例子
var value1:String?
let observable1 = Observable<String>.from(optional: value1)
value1 = "Darren1"
observable1.subscribe { event in
                       
     /// 只打印出 completed
     /// 并没有像我们想象中的那样也会打印出 onNext事件,这个是为什么呢?因为在我们订阅的时候,数据未必已经初始化完成
     let text = "operator: deferred__\(event)"
     self.showText(text: text)
     print(text)
}.disposed(by: disposeBag)

/// 把这个例子使用defer重新测试一下
var value2:String?
let observable2 = Observable<String>.deferred { () -> Observable<String> in
      return Observable<String>.from(optional: value2)
}
value2 = "Darren2"
observable2.subscribe { event in
     let text = "operator: deferred__\(event)"
     self.showText(text: text)
                       
     /// next(Darren2) completed
     print(text)
}.disposed(by: disposeBag)
of/from

这两个方法都是把一个对象或者数据转换为可观察序列,这在使用Swift中的SequenceType时很有用。

Observable<String>.of("hello", "RxSwift").subscribe { event in
     let text = "operator: of__\(event)"
     self.showText(text: text)
                                                     
     /// next(hello) next(RxSwift) completed
     print(text)
}.disposed(by: disposeBag)
Observable<String>.from(["hello", "RxSwift"]).subscribe { event in
     let text = "operator: from__\(event)"
     self.showText(text: text)
                                                         
     /// next(hello) next(RxSwift) completed
     print(text)
}.disposed(by: disposeBag)
just

将一个对象或者一个Sequence转换为 一个可观察序列,请注意这里与From是完全不相同的:from是转换为一个或者多个可观察序列(这取决于你是要将一个还是一个序列进行转换)。也就是说just只能包含一个观察序列,请注意与上面例子结果进行对比。

Observable<Array<String>>.just(["hello", "RxSwift"]).subscribe { event in
     let text = "operator: just__\(event)"
     self.showText(text: text)
                                                                
     /// next(["hello", "RxSwift"]) completed
     print(text)
}.disposed(by: disposeBag)
range

给定范围,依次显示,就是创建一个sequence,他会发出这个范围中的从开始到结束的所有事件,Observable必须指定数据类型。

Observable<Int>.range(start: 1, count: 4).subscribe(onNext: { value in
     let text = "operator: range__\(value)"
     self.showText(text: text)
     print(text)
}).disposed(by: disposeBag)
interval

创建一个可观察序列,以特定的时间间隔释放一系列整数(E -> Int/NSInteger)。

Observable<Int>.interval(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).take(3).subscribe { event in
     let text = "operator: interval__\(event)"
     self.showText(text: text)
                                                                                                          
     /// operator: interval__next(0)
     /// operator: interval__next(1)
     /// operator: interval__next(2)
     /// operator: interval__completed
     print(text)
}.disposed(by: disposeBag)
timer

在指定的时间后,发送一个特定的item (E -> Int/NSInteger),请注意这里与interval的区别(interval是发送一系列特定item,而timer只会发送一个)。

Observable<Int>.timer(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
     let text = "operator: timer__\(event)"
     self.showText(text: text)
                                                                                               
     /// operator: timer__next(0)
     /// operator: timer__completed
     print(text)
}.disposed(by: disposeBag)
repeatElement

创建一个sequence,发出特定的事件n次。

/// 如果没有.take(3), 就会一直执行下去
Observable.repeatElement("darren").take(3).subscribe(onNext: { value in
     let text = "operator: repeat__\(value)"
     self.showText(text: text)
     print(text)
}).disposed(by: disposeBag)
generate

类似于for循环,创建一个可观察sequence,当初始化的条件为true的时候,他就会发出所对应的事件。

Observable.generate(initialState: 0) {
    $0 < 5
} iterate: {
    $0 + 2
}.subscribe(onNext: { value in
     let text = "operator: generate__\(value)"
     self.showText(text: text)
     print(text)
}).disposed(by: disposeBag)
error

发出错误信号,创建一个可观察序列,但不发出任何正常的事件,只发出error事件并结束。

let error = NSError(domain: "error", code: 10, userInfo: ["This is error" : "xxxxxx"]) as Error
Observable<Any>.error(error).subscribe(onNext: { value in
     let text = "operator: error__\(value)"
     self.showText(text: text)
     print(text)
}).disposed(by: disposeBag)

Transform 变换相关

buffer

定期的将需要发射的items随机到一个buffer的包中,分批次的发射这些包,而不是一次发射一个item:例如你有[1, 2, 3, 4] ,你可以一次发射一个,也可以一次发射两个item或者三个。

/// 一次发射1个Item事件
Observable<Int>.of(1, 2, 3, 4).buffer(timeSpan: RxTimeInterval.seconds(1), count: 1, scheduler: MainScheduler.instance).subscribe { event in
     let text = "transform: buffer__\(event)"
     self.showText(text: text)
                                     
     /// transform: buffer__next([1])
     /// transform: buffer__next([2])
     /// transform: buffer__next([3])
     /// transform: buffer__next([4])
     /// transform: buffer__next([])
     /// transform: buffer__completed
     print(text)
}.disposed(by: disposeBag)

/// 一次发射3个Item事件
Observable<Int>.of(1, 2, 3, 4).buffer(timeSpan: RxTimeInterval.seconds(1), count: 3, scheduler: MainScheduler.instance).subscribe { event in
     let text = "transform: buffer__\(event)"
     self.showText(text: text)
     
     /// transform: buffer__next([1, 2, 3])
     /// transform: buffer__next([4])
     /// transform: buffer__completed
     print(text)
}.disposed(by: disposeBag)
window

与buffer类似,但是每次发射的不是item,而是Observables序列(请注意与buffer的结果比较)。

Observable<Int>.of(1, 2, 3, 4).window(timeSpan: RxTimeInterval.seconds(1), count: 3, scheduler: MainScheduler.instance).subscribe { event in
     let text = "transform: window__\(event)"
     self.showText(text: text)
                                                                                                                                   
     /// transform: window__next(RxSwift.AddRef<Swift.Int>)
     /// transform: window__next(RxSwift.AddRef<Swift.Int>)
     /// transform: window__completed
     print(text)
}.disposed(by: disposeBag)       
flatMap

将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。
这个方法是很有用的,例如当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的Observable发射这些次级Observable发射的数据的完整集合。

/// 我需要在每一个Item后跟一个新的Item叫做RxSwift
Observable<Int>.of(0, 1, 2).flatMap { (element:Int) -> Observable<String> in
     return Observable<String>.of("\(element)", "RxSwift")
}.subscribe { event in
     let text = "transform: flatMap__\(event)"
     self.showText(text: text)
                                                 
     /// transform: flatMap__next(0)
     /// transform: flatMap__next(RxSwift)
     /// transform: flatMap__next(1)
     /// transform: flatMap__next(RxSwift)
     /// transform: flatMap__next(2)
     /// transform: flatMap__next(RxSwift)
     /// transform: flatMap__completed
     print(text)
}.disposed(by: disposeBag)
groupBy

将一个Observable分拆为一些Observables集合,它们中的每一个发射原始Observable的一个子序列。

/// 我需要将奇数偶数分成两组
Observable<Int>.of(0, 1, 2, 3, 4, 5).groupBy { (element) -> String in
     return element % 2 == 0 ? "偶数" : "奇数"
}.subscribe { event in
     switch event {
            case .next(let group):
                 group.asObservable().subscribe { event in
                        let text = "transform: groupBy__key:  \(group.key)  \(event)"
                        self.showText(text: text)
                                                                                              
                        /// transform: groupBy__ key:  偶数  next(0)
                        /// transform: groupBy__ key:  奇数  next(1)
                        /// transform: groupBy__ key:  偶数  next(2)
                        /// transform: groupBy__ key:  奇数  next(3)
                        /// transform: groupBy__ key:  偶数  next(4)
                        /// transform: groupBy__ key:  奇数  next(5)
                        /// transform: groupBy__ key:  偶数  completed
                        /// transform: groupBy__ key:  奇数  completed
                        print(text)
                 }.disposed(by: self.disposeBag)
                                                              
            default:break
     }
}.disposed(by: disposeBag)
map

通过一个闭包将原来的序列转换为一个新序列的操作。

Observable<Int>.of(1, 2, 3).map { return "hello " + "\($0)" }.subscribe { event in
     let text = "transform: map__\(event)"
     self.showText(text: text)
                                                                         
     /// transform: map__next(hello 1)
     /// transform: map__next(hello 2)
     /// transform: map__next(hello 3)
     /// transform: map__completed
     print(text)
}.disposed(by: disposeBag)
scan

从字面意思可以看出是扫描,也就是说该方法会给出一个初始值(seed),每次通过一个函数将上一次的结果与序列中的Item进行处理,每处理完成都会发射.next事件。

Observable<String>.of("Rx", "Swift").scan("hello ") { acum, element in
     return acum + element
}.subscribe { event in
     let text = "transform: scan__\(event)"
     self.showText(text: text)
                                                                 
     /// transform: scan__next(hello Rx)
     /// transform: scan__next(hello RxSwift)
     /// transform: scan__completed
     print(text)
}.disposed(by: disposeBag)
reduce

与上述scan类似,都是初始一个seed,每次通过函数将上一次的结果与序列中的item进行处理,但是唯一不同的一点是,只会在最后发射一次.next事件,将其拿来作数学计算很有用,这个我们将会在后面讲到 (请注意与上述scan的结果比较)。

Observable<String>.of("Rx", "Swift").reduce("hello ") { acum, element in
     return acum + element
}.subscribe { event in
    let text = "transform: reduce__\(event)"
    self.showText(text: text)
                                                                   
    /// transform: reduce__next(hello RxSwift)
    /// transform: reduce__completed
    print(text)
}.disposed(by: disposeBag)

Filter过滤器相关

debounce

在规定的时间内过滤item。

Observable<Int>.of(1, 2, 3, 4).debounce(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
     let text = "filter: debounce__\(event)"
     self.showText(text: text)
                                                                                                                 
     /// filter: debounce__next(4)
     /// filter: debounce__completed
     print(text)
}.disposed(by: disposeBag)
distinctUntilChanged

过滤掉可观察到的重复item,表示如果发射的事件与上一次不相同那么才会发射此次事件。

Observable<Int>.of(1, 2, 2, 2, 3).distinctUntilChanged().subscribe { event in
     let text = "filter: distinctUntilChanged__\(event)"
     self.showText(text: text)
                                                                    
     /// filter: distinctUntilChanged__next(1)
     /// filter: distinctUntilChanged__next(2)
     /// filter: distinctUntilChanged__next(3)
     /// filter: distinctUntilChanged__completed
     print(text)
}.disposed(by: disposeBag)
elementAt

发射第 n个item。

Observable<Int>.of(1, 2, 3, 4, 5).element(at: 3).subscribe { event in
     let text = "filter: elementAt__\(event)"
     self.showText(text: text)
                                                            
     /// filter: elementAt__next(4)
     /// filter: elementAt__completed
     print(text)
}.disposed(by: disposeBag)
filter

仅发射谓词测试通过的Items。

Observable<Int>.of(9, 10, 11, 12).filter { element -> Bool in
     return element > 10
}.subscribe { event in
     let text = "filter: filter__\(event)"
     self.showText(text: text)
                                                      
     /// filter: filter__next(11)
     /// filter: filter__next(12)
     /// filter: filter__completed
     print(text)
}.disposed(by: disposeBag)
skip

发射第n(包含n)之后的items。

Observable<Int>.of(9, 10, 11, 12).skip(2).subscribe { event in
     let text = "filter: skip__\(event)"
     self.showText(text: text)
                                                     
     /// filter: skip__next(11)
     /// filter: skip__next(12)
     /// filter: skip__completed
     print(text)
}.disposed(by: disposeBag)
take

发射第n(不包含n)之前的items,与skip相反效果。

Observable<Int>.of(9, 10, 11, 12).take(2).subscribe { event in
     let text = "filter: take__\(event)"
     self.showText(text: text)
                                                     
     /// filter: take__next(9)
     /// filter: take__next(10)
     /// filter: take__completed
     print(text)
}.disposed(by: disposeBag)
takeLast

发射第n(包含n)之后的items,与skip相同效果。

Observable<Int>.of(9, 10, 11, 12).takeLast(2).subscribe { event in
     let text = "filter: takeLast__\(event)"
     self.showText(text: text)
                                                         
     /// filter: takeLast__next(11)
     /// filter: takeLast__next(12)
     /// filter: takeLast__completed
     print(text)
}.disposed(by: disposeBag)

Combine结合相关

merge

将多个序列的items合并为一个序列的items。

let observable1 = Observable<Int>.of(1, 3, 5, 7, 9)
let observable2 = Observable<Int>.of(2, 4, 6)
Observable<Int>.merge(observable1, observable2).subscribe { event in
     let text = "combine: merge__\(event)"
     self.showText(text: text)
                                                           
     /// combine: merge__next(1)
     /// combine: merge__next(2)
     /// combine: merge__next(3)
     /// combine: merge__next(4)
     /// combine: merge__next(5)
     /// combine: merge__next(6)
     /// combine: merge__next(7)
     /// combine: merge__next(9)
     /// combine: merge__completed
     print(text)
}.disposed(by: disposeBag)
startWith

在发射序列items前新增一个item。

Observable<String>.of("  ", "RxSwift", "!").startWith("hello").reduce("") { (accum, element) -> String in
     return accum + element
}.subscribe { event in
     let text = "combine: startWith__\(event)"
     self.showText(text: text)
                                                                                       
     /// combine: startWith__next(hello  RxSwift!)
     /// combine: startWith__completed
     print(text)
}.disposed(by: disposeBag)
zip

将多个序列的items进行一一合并,但是需要注意的是,它会等到item对其后合并,未对齐的会舍弃。

let observable3 = Observable<Int>.of(1, 2, 3, 4, 5, 6, 7)
let observable4 = Observable<String>.of("A", "B", "C", "D")
Observable<String>.zip(observable3, observable4) { (e1:Int, e2:String) -> String in
     return "\(e1)\(e2)"
}.subscribe { event in
     let text = "combine: zip__\(event)"
     self.showText(text: text)
                                                              
     /// combine: zip__next(1A)
     /// combine: zip__next(2B)
     /// combine: zip__next(3C)
     /// combine: zip__next(4D)
     /// combine: zip__completed
     print(text)
}.disposed(by: disposeBag)
combineLatest

如果存在两条事件队列,需要同时监听,那么每当有新的事件发生的时候,combineLatest 会将每个队列的最新的一个元素进行合并。
类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。
combineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,combineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。

let observable5 = Observable<Int>.of(1, 2, 3, 4, 5)
let observable6 = Observable<String>.of("A", "B", "C", "D")
Observable<String>.combineLatest(observable5, observable6) { (e1:Int, e2:String) -> String in
     return "\(e1)\(e2)"
}.subscribe { event in
     let text = "combine: combineLatest__\(event)"
     self.showText(text: text)
                                                                        
     /// combine: combineLatest__next(1A)
     /// combine: combineLatest__next(2A)
     /// combine: combineLatest__next(2B)
     /// combine: combineLatest__next(3B)
     /// combine: combineLatest__next(3C)
     /// combine: combineLatest__next(4C)
     /// combine: combineLatest__next(4D)
     /// combine: combineLatest__next(5D)
     /// combine: combineLatest__completed
     print(text)
}.disposed(by: disposeBag)

Observable<String>.combineLatest(observable6, observable5) { (e1:String, e2:Int) -> String in
     return "\(e1)\(e2)"
}.subscribe { event in
     let text = "combine: combineLatest__\(event)"
     self.showText(text: text)
                                                                        
     /// combine: combineLatest__next(A1)
     /// combine: combineLatest__next(B1)
     /// combine: combineLatest__next(B2)
     /// combine: combineLatest__next(C2)
     /// combine: combineLatest__next(C3)
     /// combine: combineLatest__next(D3)
     /// combine: combineLatest__next(D4)
     /// combine: combineLatest__next(D5)
     /// combine: combineLatest__completed
     print(text)
}.disposed(by: disposeBag)

Error错误处理相关

catch

在收到序列的异常事件时,通过返回另一个序列来持续发送非error事件。

Observable<UInt8>.create { observer in
     observer.onNext(0)
     observer.onError(NSError(domain: "error", code: 110, userInfo: nil))
     return Disposables.create()
}.catch { error -> Observable<UInt8> in
     let text = "combine: catch error__\(error)"
     self.showText(text: text)
                                  
     /// combine: catch error__Error Domain=error Code=110 "(null)"
     print(text)
     return Observable<UInt8>.of(1, 2)
}.subscribe { event in
     let text = "combine: catch__\(event)"
     self.showText(text: text)
                                              
     /// combine: catch__next(0)
     /// combine: catch__next(1)
     /// combine: catch__next(2)
     print(text)
}.disposed(by: disposeBag)
retry

出现错误事件后,重新发送所有事件信息。

Observable<UInt8>.create { observer in
      observer.onNext(0)
      observer.onError(NSError(domain: "error", code: 110, userInfo: nil))
      return Disposables.create()
}.retry(3).subscribe { event in
      let text = "combine: retry__\(event)"
      self.showText(text: text)
                                               
      /// combine: retry__next(0)
      /// combine: retry__next(0)
      /// combine: retry__next(0)
      /// combine: retry__error(Error Domain=error Code=110 "(null)")
      print(text)
}.disposed(by: disposeBag)

PracticalOperation实用操作相关

delay

延迟发射事件。

let text = "practicalOperation: delay__start time: \(Date())"
showText(text: text)

/// practicalOperation: delay__start time: 2021-03-14 09:38:36 +0000
print(text)
Observable<Int>.of(1, 2).delay(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
     if event.isCompleted {
         let text = "practicalOperation: delay__end time: \(Date())"
         self.showText(text: text)
                                                                                                            
         /// practicalOperation: delay__end time: 2021-03-14 09:38:37 +0000
         print(text)
      }
      let text = "practicalOperation: delay__\(event)"
      self.showText(text: text)
                                                                                                        
      /// practicalOperation: delay__next(1)
      /// practicalOperation: delay__next(2)
      /// practicalOperation: delay__completed
      print(text)
}.disposed(by: disposeBag)
do

在一个序列的每个事件执行之前添加一个执行动作。

Observable<Int>.of(1, 2, 3).do { _ in
     let text = "practicalOperation: do__previous next"
     self.showText(text: text)
     print(text)
} onError: { _ in
     let text = "practicalOperation: do__previous error"
     self.showText(text: text)
     print(text)
} onCompleted: {
    let text = "practicalOperation: do__previous complete"
    self.showText(text: text)
    print(text)
}.subscribe { event in
    let text = "practicalOperation: do__\(event)"
    self.showText(text: text)
             
    /// practicalOperation: do__previous next
    /// practicalOperation: do__next(1)
    /// practicalOperation: do__previous next
    /// practicalOperation: do__next(2)
    /// practicalOperation: do__previous next
    /// practicalOperation: do__next(3)
    /// practicalOperation: do__previous complete
    /// practicalOperation: do__completed
    print(text)
}.disposed(by: disposeBag)
observeOn

observer在指定scheduler中观察序列事件。

Observable<Int>.of(1).observe(on: ConcurrentDispatchQueueScheduler(queue: DispatchQueue(label: "test"))).subscribe { event in
     let text = "practicalOperation: observeOn__isMainThread: \(Thread.current.isMainThread)  \(event)"
     DispatchQueue.main.async {
          self.showText(text: text)
     }
                                                                                                                    
     /// practicalOperation: observeOn__isMainThread: false  next(1)
     /// practicalOperation: observeOn__isMainThread: false  completed
     print(text)
}.disposed(by: disposeBag)
subscribeOn

在指定的scheduler中操作,参考observeOn。

Observable<Int>.of(1).subscribe(on: MainScheduler.instance).subscribe { event in
     let text = "practicalOperation: subscribeOn__isMainThread: \(Thread.current.isMainThread)  \(event)"
     self.showText(text: text)
                                                                       
     /// practicalOperation: subscribeOn__isMainThread: true  next(1)
     /// practicalOperation: subscribeOn__isMainThread: true  completed
     print(text)
}.disposed(by: disposeBag)
timeout

一个序列在指定时间内未发射完成所有事件,那么将会进入.onError。

Observable<Int>.of(1).delay(RxTimeInterval.seconds(2), scheduler: MainScheduler.instance).timeout(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
     let text = "practicalOperation: timeout__\(event)"
     self.showText(text: text)

     /// practicalOperation: timeout__error(Sequence timeout.)
     print(text)
}.disposed(by: disposeBag)
ifEmpty

如果是序列中没有任何item,那么给定一个default。

Observable<Int>.empty().ifEmpty(default: 0).subscribe { event in
     let text = "practicalOperation: defaultIfEmpty__\(event)"
     self.showText(text: text)
                                                       
     /// practicalOperation: defaultIfEmpty__next(0)
     /// practicalOperation: defaultIfEmpty__completed
     print(text)
}.disposed(by: disposeBag)
skipUntil

丢弃掉第一个序列的所有items,直到第二个序列的item出现。

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence.skip(until: referenceSequence).subscribe { event in
    let text = "practicalOperation: skipUntil__\(event)"
    self.showText(text: text)
                                                         
    /// practicalOperation: skipUntil__next(4)
    /// practicalOperation: skipUntil__next(5)
    /// practicalOperation: skipUntil__completed
    print(text)
}.disposed(by: disposeBag)
sourceSequence.onNext("1")
sourceSequence.onNext("2")
sourceSequence.onNext("3")
referenceSequence.onNext("1")
sourceSequence.onNext("4")
sourceSequence.onNext("5")

/// 必须调用onCompleted, 不然返回, 所在控制器, 不会被释放
sourceSequence.onCompleted()
skipWhile

丢弃掉所有的items,直到满足某个不满足条件的item出现。

Observable<String>.of("AD", "BD", "CD").skip { element -> Bool in
     return element.contains("A")
}.subscribe { event in
     let text = "practicalOperation: skipWhile__\(event)"
     self.showText(text: text)
                                                          
     /// practicalOperation: skipWhile__next(BD)
     /// practicalOperation: skipWhile__next(CD)
     /// practicalOperation: skipWhile__completed
     print(text)
}.disposed(by: disposeBag)
takeUntil

取得第一个序列所有items,直到第二个序列发射item或者终止。

Observable<Int>.of(1, 2, 3, 4, 5, 6, 7, 8, 9).take(until: { element -> Bool in
     return element > 4
}).subscribe { event in
     let text = "practicalOperation: takeUntil__\(event)"
     self.showText(text: text)
                                                                        
     /// practicalOperation: takeUntil__next(1)
     /// practicalOperation: takeUntil__next(2)
     /// practicalOperation: takeUntil__next(3)
     /// practicalOperation: takeUntil__next(4)
     /// practicalOperation: takeUntil__completed
     print(text)
}.disposed(by: disposeBag)
takeWhile

取得第一个序列的所有items,直到出现不满足条件的item (请仔细体会与skipWhile的不同之处)。

Observable<Int>.of(1, 2, 3, 4, 5, 6, 7, 8, 9).take(while: { element -> Bool in
     return element < 5
}).subscribe { event in
     let text = "practicalOperation: takeWhile__\(event)"
     self.showText(text: text)
                                                                        
     /// practicalOperation: takeWhile__next(1)
     /// practicalOperation: takeWhile__next(2)
     /// practicalOperation: takeWhile__next(3)
     /// practicalOperation: takeWhile__next(4)
     /// practicalOperation: takeWhile__completed
     print(text)
}.disposed(by: disposeBag)

Observer观察者&Observable被观察者相关

AnyObserver
let observer1 = AnyObserver<Int> { event in
                     let text = "observer: anyObserver__\(event)"
                     self.showText(text: text)
                                  
                     /// observer: anyObserver__next(1)
                     /// observer: anyObserver__next(2)
                     /// observer: anyObserver__next(3)
                     /// observer: anyObserver__completed
                     print(text)
                 }
Observable<Int>.of(1, 2, 3).subscribe(observer1).disposed(by: disposeBag)
AsyncSubject

Subject在ReactiveX的一些实现中扮演了一种桥梁或者代理的角色,它既可以作为Observer也可以作为Observable来使用。
作为观察者来说它可以订阅一个或者多个可观察序列,作为可观察者来说它可以通过items的reemitting来观察,并且还可以发射新的items事件,我们将从如下四个Subject进行学习:
AsyncSubject仅仅只发送订阅之后的最后一个item以及.onCompleted,如果出现错误,那么仅仅将只发送.onError。

let asyncSubject = AsyncSubject<Int>()
asyncSubject.onNext(1)
asyncSubject.onNext(2)
asyncSubject.subscribe { event in
     let text = "observer: asyncSubject__\(event)"
     self.showText(text: text)
                        
     /// observer: asyncSubject__next(4)
     /// observer: asyncSubject__completed
     print(text)
}.disposed(by: disposeBag)
asyncSubject.onNext(3)
asyncSubject.onNext(4)

/// 没有调用onCompleted, 则收不到subscribe, 因为不知道哪个是最后一个
asyncSubject.onCompleted()
ReplaySubject

订阅ReplaySubject的时候,可以接收到订阅他之后的事件,但也可以接受订阅他之前发出的事件,接受几个事件取决与bufferSize的大小。
createUnbounded()表示接受所有事件。
create(bufferSize: 4) 表示可接受到的订阅他之前的事件的个数,但是订阅他之后的事件一定会触发。

let replaySubject = ReplaySubject<Int>.createUnbounded()
replaySubject.onNext(1)
replaySubject.onNext(2)
replaySubject.subscribe { event in
      let text = "observer: replaySubject__\(event)"
      self.showText(text: text)
                         
      /// observer: replaySubject__next(1)
      /// observer: replaySubject__next(2)
      /// observer: replaySubject__next(3)
      /// observer: replaySubject__completed
      print(text)
}.disposed(by: disposeBag)
replaySubject.onNext(3)
replaySubject.onCompleted()

let replaySubject1 = ReplaySubject<Int>.create(bufferSize: 1)
replaySubject1.onNext(1)
replaySubject1.onNext(2)
replaySubject1.subscribe { event in
      let text = "observer: replaySubject__\(event)"
      self.showText(text: text)
                          
      /// observer: replaySubject__next(2)
      /// observer: replaySubject__next(3)
      /// observer: replaySubject__completed
      print(text)
}.disposed(by: disposeBag)
replaySubject1.onNext(3)
replaySubject1.onCompleted()
PublishSubject

订阅PublishSubject的时候,只能接收到订阅他之后发生的事件。subject.onNext()发出onNext事件,对应的还有onError()和onCompleted()事件,可以把他看成一个bufferSize=0的ReplaySubject。

let publishSubject = PublishSubject<Int>()
publishSubject.onNext(1)
publishSubject.onNext(2)
publishSubject.subscribe { event in
       let text = "observer: publishSubject__\(event)"
       self.showText(text: text)
                          
       /// observer: publishSubject__next(3)
       /// observer: publishSubject__completed
       print(text)
}.disposed(by: disposeBag)
publishSubject.onNext(3)
publishSubject.onCompleted()
BehaviorSubject

订阅了BehaviorSubject,会接受到订阅之前的最后一个事件,订阅之后的事件一定会触发。

let behaviorSubject = BehaviorSubject<Int>(value: 0)
behaviorSubject.onNext(1)
behaviorSubject.onNext(2)
behaviorSubject.subscribe { event in
        let text = "observer: behaviorSubject__\(event)"
        self.showText(text: text)
                           
        /// observer: behaviorSubject__next(2)
        /// observer: behaviorSubject__next(3)
        /// observer: behaviorSubject__completed
        print(text)
}.disposed(by: disposeBag)
behaviorSubject.onNext(3)
behaviorSubject.onCompleted()
BehaviorRelay

BehaviorReplay是Swift5 替换 Swift4 中的 Variable。
1 可以明确的是它不是subject类型,因为它只是一个可观察序列,但是它又包含subject对象(私有的BehaviorSubject)。
2 初始化的时候也需要一个初始值。
3 既然它不是一个订阅者,那么就不能发出onNext:、complete和error事件。
4 只能通过accept发出event。
总结:BehaviorRelay 跟 BehaviorSubject 很像,只是不是发出complete、error事件。

let behaviorRelay = BehaviorRelay(value: 0)
behaviorRelay.accept(1)
behaviorRelay.accept(2)
behaviorRelay.subscribe { event in
     let text = "observer: behaviorRelay__\(event)"
     self.showText(text: text)
                         
     /// observer: behaviorRelay__next(2)
     /// observer: behaviorRelay__next(3)
     /// observer: behaviorRelay__next(4)
     print(text)
}.disposed(by: disposeBag)
behaviorRelay.accept(3)
behaviorRelay.accept(4)
Driver

Driver从名字上可以理解为驱动,在功能上它类似被观察者(Observable),而它本身也可以与被观察者相互转换(Observable: asDriver, Driver: asObservable),它驱动着一个观察者,当它的事件流中有事件涌出时,被它驱动着的观察者就能进行相应的操作。一般我们会将一个Observable被观察者转换成Driver后再进行驱动操作。
Driver的drive方法与Observable的方法bindTo用法非常相似,事实上,它们的作用也是一样,说白了就是被观察者与观察者的绑定。
那为什么RxSwift的作者又搞出Driver这么个东西来呢?
比较与Observable,Driver有以下的特性:
1 它不会发射出错误(Error)事件。
2 对它的观察订阅是发生在主线程(UI线程)的。
3 自带shareReplayLatestWhileConnected。

let categorysDriver = categorysRelay.map { models -> [ServiceCategoryItemModel] in
     return [ServiceCategoryItemModel(model: Void(), items: models)]
}.asDriver(onErrorJustReturn: []).drive(headerView.categorysRelay).disposed(by: disposeBag)

Scheduler调度器相关

对于Scheduler来说,我们需要了解Concurrent(并行)、Serial(串行)Scheduler就可以了。

Observable<Int>.of(1, 2, 3).observe(on: SerialDispatchQueueScheduler(internalSerialQueueName: "serialDispatchQueue")).map { element -> Int in
     let text = "scheduler: map --> Main Thread: \(Thread.current.isMainThread) element__\(element)"
     DispatchQueue.main.async {
            self.showText(text: text)
     }
                                                                                                                           
     /// scheduler: map --> Main Thread: false element__1
     /// scheduler: map --> Main Thread: false element__2
     /// scheduler: map --> Main Thread: false element__3
     print(text)
     return element * 2
  
/// shareReplay(1)或shareReplayLatestWhileConnected,以防止以后被观察者被多次订阅观察后,map中的语句会多次调用:
}.share(replay: 1).subscribe(on: MainScheduler.instance).observe(on: MainScheduler.instance).subscribe { event in
     let text = "scheduler: subscribe --> Main Thread: \(Thread.current.isMainThread) event__\(event)"
     self.showText(text: text)
                                                                                                                          
     /// scheduler: subscribe --> Main Thread: true event__next(2)
     /// scheduler: subscribe --> Main Thread: true event__next(4)
     /// scheduler: subscribe --> Main Thread: true event__next(6)
     /// scheduler: subscribe --> Main Thread: true event__completed
     print(text)
}.disposed(by: disposeBag)

Observable<Int>.of(4, 5, 6).observe(on: ConcurrentDispatchQueueScheduler(queue: DispatchQueue(label: "concurrentDispatchQueue"))).map { element -> Int in
     let text = "scheduler: map --> Main Thread: \(Thread.current.isMainThread) element__\(element)"
     DispatchQueue.main.async {
          self.showText(text: text)
     }
                                                                                                                                       
     /// scheduler: map --> Main Thread: false element__4
     /// scheduler: map --> Main Thread: false element__5
     /// scheduler: map --> Main Thread: false element__6
     print(text)
     return element * 2
                                                                     
/// shareReplay(1)或shareReplayLatestWhileConnected,以防止以后被观察者被多次订阅观察后,map中的语句会多次调用:
}.share(replay: 1).subscribe(on: MainScheduler.instance).observe(on: MainScheduler.instance).subscribe { event in
     let text = "scheduler: subscribe --> Main Thread: \(Thread.current.isMainThread) event__\(event)"
     self.showText(text: text)
                                                                                                                                      
     /// scheduler: subscribe --> Main Thread: true event__next(8)
     /// scheduler: subscribe --> Main Thread: true event__next(10)
     /// scheduler: subscribe --> Main Thread: true event__next(12)
     /// scheduler: subscribe --> Main Thread: true event__completed
     print(text)
}.disposed(by: disposeBag)

Observable<Int>.of(7, 8, 9).observe(on: ConcurrentMainScheduler.instance).map { element -> Int in
     let text = "scheduler: map --> Main Thread: \(Thread.current.isMainThread) element__\(element)"
     DispatchQueue.main.async {
          self.showText(text: text)
     }
        
     /// scheduler: map --> Main Thread: true element__7
     /// scheduler: map --> Main Thread: true element__8
     /// scheduler: map --> Main Thread: true element__9
     print(text)
     return element * 2
                                                                               
/// shareReplay(1)或shareReplayLatestWhileConnected,以防止以后被观察者被多次订阅观察后,map中的语句会多次调用:
}.share(replay: 1).subscribe(on: MainScheduler.instance).observe(on: MainScheduler.instance).subscribe { event in
     let text = "scheduler: subscribe --> Main Thread: \(Thread.current.isMainThread) event__\(event)"
     self.showText(text: text)
                                                                              
     /// scheduler: subscribe --> Main Thread: true event__next(14)
     /// scheduler: subscribe --> Main Thread: true event__next(16)
     /// scheduler: subscribe --> Main Thread: true event__next(18)
     /// scheduler: subscribe --> Main Thread: true event__completed
     print(text)
}.disposed(by: disposeBag)

Disposable

订阅了一个可观察序列,如果有特殊需求需要提前取消订阅时使用。也就是说Disposable是用来取消订阅的一个工具,通过Disposables工具创建。

let dis1 = Disposables.create()
let dis2 = Disposables.create {
    print("在Dispose之前所做一些工作")
}
let _ = Disposables.create([dis1, dis2])

/// dispose:通过.dispose()取消或者添加到disposeBag(可以将它看成一个非ARC机制下的AutoReleasePool)
let disposable = Observable<Int>.of(1, 2, 3).subscribe { event in
                      print(event)
                 }
disposable.dispose()

/// 或者
disposable.disposed(by: disposeBag)

常用方法

监听点击事件和输入事件
closeButton.rx.tap.subscribe(onNext: {[weak self] _ in
     self?.closeClosure?()
}).disposed(by: disposeBag)

phoneTextField.rx.text.orEmpty.subscribe(onNext: {[weak self] (text) in
     self?.textDidChange()
     self?.textDidChangeClosure?(text)
}).disposed(by: disposeBag)

phoneTextField.rx.controlEvent(.editingDidBegin).subscribe(onNext: {[weak self] _ in
     self?.editingDidBeginClosure?()
}).disposed(by: disposeBag)

phoneTextField.rx.controlEvent(.editingDidEnd).subscribe(onNext: {[weak self] _ in
     self?.editingDidEndClosure?()
}).disposed(by: disposeBag)
代替通知
// MARK: public
extension NotificationManager {

    /**
     注册通知
     */
    static func registNotification(_ name:NSNotification.Name, disposeBag:DisposeBag, callBack:@escaping ((_ noti:Notification) -> ())) {
        NotificationCenter.default.rx.notification(name).subscribe(onNext: {(noti) in
            callBack(noti)
        }).disposed(by: disposeBag)
    }

    /**
     发送通知
     */
    static func post(name aName: NSNotification.Name, object anObject: Any? = nil, userInfo aUserInfo: [AnyHashable : Any]? = nil) {
        NotificationCenter.default.post(name: aName, object: anObject, userInfo: aUserInfo)
    }
}
代替观察者
/// 只发出与下一个间隔超过0.25秒的元素
scrollView.rx.observe(String.self, "contentSize").debounce(RxTimeInterval.milliseconds(250), scheduler: MainScheduler.instance).subscribe(onNext: {[weak self] _ in
    self?.contentSizeClosure?(self?.scrollView.contentSize ?? .zero)
}).disposed(by: self.disposeBag)
信号合并
/// 将下拉刷新/上拉加载更多的信号和请求参数信号合并, 确定最后发起请求的请求参数
Observable<NewsListParams>.combineLatest(input.paramsRelay, input.reloadDataSubject) {[weak self] (params:NewsListParams, reloadData:Bool) -> NewsListParams in
     params.page = self?.caculatePage(reloadData) ?? 1
     params.row = self?.pageSize ?? 10
     return params
 }.subscribe(onNext: {[weak self] params in
     self?.showHUD()
     NewsManager.requestNewsListData(params: params) { datas in
           HUD.dismiss()
           self?.handleData(datas?.rows ?? [], scrollView: self?.tableView ?? BaseTableView(), isFirstFetch: params.page == 1, totalCount: datas?.total ?? 0)
     }
}).disposed(by: disposeBag)
数据绑定
let categorysDriver = categorysRelay.map { models -> [ServiceCategoryItemModel] in
     return [ServiceCategoryItemModel(model: Void(), items: models)]
}.asDriver(onErrorJustReturn: []).drive(headerView.categorysRelay).disposed(by: disposeBag)
代替定时器
/// Interval:创建一个可观察序列,以特定的时间间隔释放一系列整数(E -> Int/NSInteger)
Observable<Int>.interval(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).take(3).subscribe { event in
     let text = "operator: interval__\(event)"
     self.showText(text: text)

     /// operator: interval__next(0)
     /// operator: interval__next(1)
     /// operator: interval__next(2)
     /// operator: interval__completed
     print(text)
}.disposed(by: disposeBag)

/// Timer:在指定的时间后,发送一个特定的Item (E -> Int/NSInteger),请注意这里与Interval的区别(Interval是发送一系列特定Item,而Timer只会发送一个)
Observable<Int>.timer(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { event in
     let text = "operator: timer__\(event)"
     self.showText(text: text)

     // operator: timer__next(0)
     /// operator: timer__completed
     print(text)
}.disposed(by: disposeBag)
上一篇下一篇

猜你喜欢

热点阅读