RxSwift 相关方法

2023-04-14  本文已影响0人  Abner_XuanYuan

1、决策树

我想要创建一个 Observable

我想要创建一个 Observable 通过组合其他的 Observables

我想要转换 Observable 的元素后,再将它们发出来

我想要将产生的每一个元素,拖延一段时间后再发出:delay

我想要将产生的事件封装成元素发送出来

我想要忽略掉所有的 next 事件,只接收 completederror 事件:ignoreElements

我想创建一个新的 Observable 在原有的序列前面加入一些元素:startWith

我想从 Observable 中收集元素,缓存这些元素之后在发出:buffer

我想将 Observable 拆分成多个 Observableswindow

我想只接收 Observable 中特定的元素

我想重新从 Observable 中发出某些元素

我想要从一些 Observables 中,只取第一个产生元素的 Observableamb

我想评估 Observable 的全部元素

我想把 Observable 转换为其他的数据结构:as...

我想在某个 Scheduler 应用操作符:subscribeOn

我想要 Observable 发生某个事件时, 采取某个行动:do

我想要 Observable 发出一个 error 事件:error

我想要 Observable 发生错误时,优雅的恢复

我创建一个 Disposable 资源,使它与 Observable 具有相同的寿命:using

我创建一个 Observable,直到我通知它可以产生元素后,才能产生元素:publish

2、Observable 的创建方式

just() :创建 Observable 发出唯一的一个元素。该方法通过传入一个默认值来初始化,构建一个只有一个元素的 Observable 队列,订阅完信息自动 complete。

let just = Observable.just(0)
//它相当于
let just = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onCompleted()
    return Disposables.create()
}


        Observable<[String]>.just(["Jack","Rose"])
            .subscribe{(event) in
                print(event)
            }.disposed(by: disposeBag)

          //next(["Jack", "Rose"])
          //completed

of():此方法创建一个新的可观察实例,该实例具有可变数量的元素。该方法可以接受可变数量的参数(必需要是同类型的)。

// 多个元素 - 针对序列处理
Observable<String>.of("Jack","Rose")
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)
//next(Jack)
//next(Rose)
//completed

// 字典
Observable<[String: Any]>.of(["name":"Jack","age":22])
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)
//next(["age": 22, "name": "Jack"])
//completed

// 数组
Observable<[String]>.of(["Jack","Rose"])
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)
//next(["Jack", "Rose"])
//completed

from():将其他类型或者数据结构转换为 Observable。

//将一个数组转换为 Observable:
let numbers = Observable.from([0, 1, 2])
//相当于
let numbers = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onNext(1)
    observer.onNext(2)
    observer.onCompleted()
    return Disposables.create()
}


//将一个可选值转换为 Observable:
let num: Int? = 1
let value = Observable.from(optional: num)
//相当于
let num: Int? = 1
let value = Observable<Int>.create { observer in
    if let element = num {
        observer.onNext(element)
    }
    observer.onCompleted()
    return Disposables.create()
}

empty():empty 操作符将创建一个 Observable,这个 Observable 只有一个完成事件。

//使用
let emtyOb = Observable<Int>.empty()
//相当于
let emtyOb = Observable<Int>.create { observer in
    observer.onCompleted()
    return Disposables.create()
}

_ = emtyOb.subscribe(onNext: { (number) in
    print("订阅:",number)
}, onError: { (error) in
    print("error:",error)
}, onCompleted: {
    print("完成回调")
}) {
    print("释放回调")
}
//完成回调
//释放回调

never():never 操作符将创建一个 Observable,这个 Observable 不会产生任何事件。

let id = Observable<Int>.never()
//它相当于
let id = Observable<Int>.create { observer in
    return Disposables.create()
}


Observable<String>.never()
    .subscribe { (event) in
        print("走你",event)
    }
    .disposed(by: disposeBag)

error():error 操作符将创建一个 Observable,这个 Observable 只会产生一个 error 事件。

Observable<String>.error((NSError.init(domain: "domain", code: 4444, userInfo: ["errorInfo":"errorMsg"]))
    .subscribe { (event) in
        print("订阅:",event)
    }
    .disposed(by: disposeBag)

range():使用指定的调度程序生成并发送观察者消息,生成指定范围内的可观察整数序列。

Observable.range(start: 2, count: 5)
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)

// 底层源码
init(start: E, count: E, scheduler: ImmediateSchedulerType) {
    self._start = start
    self._count = count
    self._scheduler = scheduler
}

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
    let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
    let subscription = sink.run()
    return (sink: sink, subscription: subscription)
}

repeatElement():创建一个 Observable,这个 Observable 将无止尽地发出同一个元素。

let id = Observable.repeatElement(0)
//相当于
let id = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onNext(0)
    observer.onNext(0)
    observer.onNext(0)
    ... // 无数次
    return Disposables.create()
}


Observable<Int>.repeatElement(5)
    .subscribe { (event) in
        // print("订阅:",event)
    }
    .disposed(by: disposeBag)

generate():创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的 Observable 序列

Observable.generate(initialState: 0,// 初始值
                    condition: { $0 < 10}, // 条件1
                    iterate: { $0 + 2 })  // 条件2 +2
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)

// 数组遍历
let arr = ["Jack_1","Jack_2","Jack_3","Jack_4","Jack_5","Jack_6","Jack_7","Jack_8","Jack_9","Jack_10"]
Observable.generate(initialState: 0,// 初始值
    condition: { $0 < arr.count}, // 条件1
    iterate: { $0 + 1 })  // 条件2 +2
    .subscribe(onNext: {
        print("遍历arr:",arr[$0])
    })
    .disposed(by: disposeBag)

create():通过一个构建函数完整的创建一个 Observable。create 操作符将创建一个 Observable,你需要提供一个构建函数,在构建函数里面描述事件(next,error,completed)的产生过程。通常情况下一个有限的序列,只会调用一次观察者的 onCompleted 或者 onError 方法。并且在调用它们后,不会再去调用观察者的其他方法。

//创建一个 [0, 1, ... 8, 9] 的序列:

let id = Observable<Int>.create { observer in
    observer.onNext(0)
    observer.onNext(1)
    observer.onNext(2)
    observer.onNext(3)
    observer.onNext(4)
    observer.onNext(5)
    observer.onNext(6)
    observer.onNext(7)
    observer.onNext(8)
    observer.onNext(9)
    observer.onCompleted()
    return Disposables.create()
}

deferred():deferred 操作符将等待观察者订阅它,才创建一个 Observable,它会通过一个构建函数为每一位订阅者创建新的 Observable。看上去每位订阅者都是对同一个 Observable 产生订阅,实际上它们都获得了独立的序列。在一些情况下,直到订阅时才创建 Observable 是可以保证拿到的数据都是最新的。

//用于标记是奇数、还是偶数
var isOdd = true
 
//使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。
let factory : Observable<Int> = Observable.deferred {
     
    //让每次执行这个block时候都会让奇、偶数进行交替
    isOdd = !isOdd
     
    //根据isOdd参数,决定创建并返回的是奇数Observable、还是偶数Observable
    if isOdd {
        return Observable.of(1, 3, 5 ,7)
    }else {
        return Observable.of(2, 4, 6, 8)
    }
}
 
//第1次订阅测试
factory.subscribe { event in
    print("\(isOdd)", event)
}
 
//第2次订阅测试
factory.subscribe { event in
    print("\(isOdd)", event)
}

interval():interval 操作符将创建一个 Observable,它每隔一段设定的时间,发出一个索引数的元素。它将发出无数个元素。

Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
            .subscribe{(event) in
                print(event)
   }
    //.disposed(by: disposeBag)

timer():timer 操作符将创建一个 Observable,它在经过设定的一段时间后,产生唯一的一个元素。

//5秒种后发出唯一的一个元素0
let observable = Observable<Int>.timer(.seconds(5), scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}
 //.disposed(by: disposeBag)

另一种是创建的 Observable 序列在经过设定的一段时间后,每隔一段时间产生一个元素.

//延时5秒种后,每隔1秒钟发出一个元素
let observable = Observable<Int>.timer(.seconds(5), period: .seconds(1), scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}
//.disposed(by: disposeBag)

3、高阶函数

1、组合

startWith:startWith 操作符会在 Observable 头部插入一些元素。
(如果你想在尾部加入一些元素可以用 concat)

let disposeBag = DisposeBag()

Observable.of("🐶", "🐱", "🐭", "🐹")
    .startWith("1")
    .startWith("2")
    .startWith("3", "🅰️", "🅱️")
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//result
3
🅰️
🅱️
2
1
🐶
🐱
🐭
🐹

merge:将多个 Observables 合并成一个。通过使用 merge 操作符你可以将多个 Observables 合并成一个,当某一个 Observable 发出一个元素时,他就将这个元素发出。如果,某一个 Observable 发出一个 onError 事件,那么被合并的 Observable 也会将它发出,并且立即终止序列。

let disposeBag = DisposeBag()

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.of(subject1, subject2)
    .merge()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

subject1.onNext("🅰️")

subject1.onNext("🅱️")

subject2.onNext("①")

subject2.onNext("②")

subject1.onNext("🆎")

subject2.onNext("③")

//result
🅰️
🅱️
①
②
🆎
③

zip:zip 操作符将多个(最多不超过8个) Observables 的元素通过一个函数组合起来,然后将这个组合的结果发出来。它会严格的按照序列的索引数进行组合。它的元素数量等于源 Observables 中元素数量最少的那个。

let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()

Observable.zip(first, second) { $0 + $1 }
          .subscribe(onNext: { print($0) })
          .disposed(by: disposeBag)

first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")

//result
1A
2B
3C
4D

combineLatest:当多个 Observables 中任何一个 Observable 发出一个新元素,combineLatest 就会发出一个元素。这个元素是由这些 Observables 中最新的元素,通过一个函数组合起来的。combineLatest 操作符将多个 Observables 中最新的元素通过一个函数组合起来,然后将这个组合的结果发出来。这些源 Observables 中任何一个发出一个元素,他都会发出一个元素(前提是,这些 Observables 曾经都发出过元素)。

let disposeBag = DisposeBag()

let first = PublishSubject<String>()
let second = PublishSubject<String>()

Observable.combineLatest(first, second) { $0 + $1 }
          .subscribe(onNext: { print($0) })
          .disposed(by: disposeBag)

first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")

//结果
1A
2A
2B
2C
2D
3D
4D

switchLatest:将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素。

let switchLatestSub1 = BehaviorSubject(value: "L")
let switchLatestSub2 = BehaviorSubject(value: "1")
let switchLatestSub  = BehaviorSubject(value: switchLatestSub1)// 选择了 switchLatestSub1 就不会监听 switchLatestSub2

switchLatestSub.asObservable()
    .switchLatest()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//L
switchLatestSub1.onNext("G")//G
switchLatestSub1.onNext("_")//_
switchLatestSub2.onNext("2")// 不会监听,但是默认保存由 2 覆盖 1
switchLatestSub2.onNext("3") // 不会监听,但是默认保存由 3 覆盖 2
switchLatestSub.onNext(switchLatestSub2) // 切换到 switchLatestSub2
switchLatestSub1.onNext("*")//不会监听,但是默认保存由 * 覆盖 _
switchLatestSub1.onNext("Cooci") //不会监听,但是默认保存由 Cooci 覆盖 *
switchLatestSub2.onNext("4")//4
2、映射

map:通过一个转换函数,将 Observable 的每个元素转换一遍。map 操作符将源 Observable 的每个元素应用你提供的转换方法,然后返回含有转换结果的 Observable。

let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
    .map { $0 * 10 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//result
10
20
30
*/

flatMap:先将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 合并。flatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的元素合并之后再发送出来。

let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let subject = BehaviorSubject(value: first)

subject.asObservable()
        .flatMap { $0 }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

first.onNext("🐱")
subject.onNext(second)
second.onNext("🅱️")
first.onNext("🐶")

//result
👦🏻
🐱
🅰️
🅱️
🐶

flatMapLatest:将 Observable 的元素转换成其他的 Observable,然后取这些 Observables 中最新的一个。flatMapLatest 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。一旦转换出一个新的 Observable,就只发出它的元素,旧的 Observables 的元素将被忽略掉。

let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let subject = BehaviorSubject(value: first)

subject.asObservable()
        .flatMapLatest { $0 }
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)

first.onNext("🐱")
subject.onNext(second)
second.onNext("🅱️")
first.onNext("🐶")

//result
👦🏻
🐱
🅰️
🅱️

scan:scan 操作符将对第一个元素应用一个函数,将结果作为第一个元素发出。然后,将结果作为参数填入到第二个元素的应用函数中,创建第二个元素。以此类推,直到遍历完全部的元素。

let disposeBag = DisposeBag()

Observable.of(10, 100, 1000)
    .scan(1) { aggregateValue, newValue in
        aggregateValue + newValue
    }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//result
11
111
1111
3、过滤

filter:仅仅发出 Observable 中通过判定的元素。filter 操作符将通过你提供的判定方法过滤一个 Observable。

Observable.of(1,2,3,4,5,6,7,8,9,0)
    .filter { $0 % 2 == 0 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

/*
2
4
6
8
0
*/

distinctUntilChanged:distinctUntilChanged 操作符将阻止 Observable 发出相同的元素。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。如果后一个元素和前一个元素不相同,那么这个元素才会被发出来。

let disposeBag = DisposeBag()

Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱")
    .distinctUntilChanged()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//result
🐱
🐷
🐱
🐵
🐱

elementAt:elementAt 操作符将拉取 Observable 序列中指定索引数的元素,然后将它作为唯一的元素发出。

let disposeBag = DisposeBag()
Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .elementAt(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//result
🐸

single:single 操作符将限制 Observable 只产生一个元素。如果 Observable 只有一个元素,它将镜像这个 Observable 。如果 Observable 没有元素或者元素数量大于一,它将产生一个 error 事件。

Observable.of("Jack", "Rose")
    .single()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//Jack
//Unhandled error happened: Sequence contains more than one element.

Observable.of("Jack", "Rose")
    .single { $0 == "Rose" }
    .subscribe { print($0) }
    .disposed(by: disposeBag)
//next(Rose)
//completed

take:通过 take 操作符你可以只发出头 n 个元素。并且忽略掉后面的元素,直接结束序列。

Observable.of("Hank", "Kody","Cooci", "CC")
    .take(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
//Hank
//Kody

takeLast:通过 takeLast 操作符你可以只发出尾部 n 个元素。并且忽略掉前面的元素。

Observable.of("Hank", "Kody","Cooci", "CC")
    .takeLast(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
/*
Kody
Cooci
CC
*/

takeWhile:takeWhile 操作符将镜像源 Observable 直到某个元素的判定为 false。此时,这个镜像的 Observable 将立即终止。

let disposeBag = DisposeBag()

Observable.of(1, 2, 3, 4, 3, 2, 1)
    .takeWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//result
1
2
3

takeUntil:takeUntil 操作符将镜像源 Observable,它同时观测第二个 Observable。一旦第二个 Observable 发出一个元素或者产生一个终止事件,那个镜像的 Observable 将立即终止。

let disposeBag = DisposeBag()

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .takeUntil(referenceSequence)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")

referenceSequence.onNext("🔴")

sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")

//result
next(🐱)
next(🐰)
next(🐶)
completed

skip:skip 操作符可以让你跳过 Observable 中头 n 个元素,只关注后面的元素。

let disposeBag = DisposeBag()

Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
    .skip(2)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//result
🐶
🐸
🐷
🐵

skipUntil:skipUntil 操作符可以让你忽略源 Observable 中头几个元素,直到另一个 Observable 发出一个元素后,它才镜像源 Observable。

let disposeBag = DisposeBag()

let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()

sourceSequence
    .skipUntil(referenceSequence)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

sourceSequence.onNext("🐱")
sourceSequence.onNext("🐰")
sourceSequence.onNext("🐶")

referenceSequence.onNext("🔴")

sourceSequence.onNext("🐸")
sourceSequence.onNext("🐷")
sourceSequence.onNext("🐵")

//result
🐸
🐷
🐵
4、集合

toArray:将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止。

Observable.range(start: 1, count: 10)
    .toArray()
    .subscribe { print($0) }
    .disposed(by: disposeBag)

//success([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

reduce:reduce 操作符将对第一个元素应用一个函数。然后,将结果作为参数填入到第二个元素的应用函数中。以此类推,直到遍历完全部的元素后发出最终结果。

Observable.of(10, 100, 1000)
    .reduce(1, accumulator: +)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

// 1 + 10 + 100 + 1000 = 1111

concat:让两个或多个 Observables 按顺序串连起来。concat 操作符将多个 Observables 按顺序串联起来,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。concat 将等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。如果后一个是“热” Observable ,在它前一个 Observable 产生完成事件前,所产生的元素将不会被发送出来。

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")

let subject = BehaviorSubject(value: subject1)

subject
    .asObservable()
    .concat()
    .subscribe { print($0) }
    .disposed(by: disposeBag)

subject1.onNext("🍐")
subject1.onNext("🍊")

subject.onNext(subject2)

subject2.onNext("I would be ignored")
subject2.onNext("🐱")

subject1.onCompleted()

subject2.onNext("🐭")

//result
next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)
5、error

catchErrorJustReturn:操作符会将 error 事件替换成其他的一个元素,然后结束该序列。

let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()

sequenceThatFails
    .catchErrorJustReturn("😊")
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)

//结果
next(😬)
next(😨)
next(😡)
next(🔴)
next(😊)
completed

catchError:从一个错误事件中恢复,将错误事件替换成一个备选序列。catchError 操作符将会拦截一个 error 事件,将它替换成其他的元素或者一组元素,然后传递给观察者。这样可以使得 Observable 正常结束,或者根本都不需要结束。

let disposeBag = DisposeBag()

let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()

sequenceThatFails
    .catchError {
        print("Error:", $0)
        return recoverySequence
    }
    .subscribe { print($0) }
    .disposed(by: disposeBag)

sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)

recoverySequence.onNext("😊")

//结果
next(😬)
next(😨)
next(😡)
next(🔴)
Error: test
next(😊)

retry:如果源 Observable 产生一个错误事件,重新对它进行订阅,希望它不会再次产生错误。retry 操作符将不会将 error 事件,传递给观察者,然而,它会重新订阅源 Observable,给这个 Observable 一个重试的机会,让它有机会不产生 error 事件。retry 总是对观察者发出 next 事件,即便源序列产生了一个 error 事件,所以这样可能会产生重复的元素。

let disposeBag = DisposeBag()
var count = 1

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")

    if count == 1 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }

    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()

    return Disposables.create()
}

sequenceThatErrors
    .retry()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//result
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
🐶
🐱
🐭
let disposeBag = DisposeBag()
var count = 1

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")

    if count < 5 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }

    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()

    return Disposables.create()
}

sequenceThatErrors
    .retry(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//result
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
Unhandled error happened: test
 subscription called from:

retry(_:)::通过重新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到 max 未遂计数。

let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("Hank")
    observer.onNext("Kody")
    observer.onNext("CC")
    
    if count < 5 { // 这里设置的错误出口是没有太多意义的额,因为我们设置重试次数
        observer.onError(self.lgError)
        print("错误序列来了")
        count += 1
    }
    
    observer.onNext("Lina")
    observer.onNext("小雁子")
    observer.onNext("婷婷")
    observer.onCompleted()
    
    return Disposables.create()
}

sequenceThatErrors
    .retry(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
6、流程

debug:打印所有的订阅,事件以及销毁信息。

let disposeBag = DisposeBag()

let sequence = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onCompleted()
    return Disposables.create()
}

sequence
    .debug("Fruit")
    .subscribe()
    .disposed(by: disposeBag)

//result
Fruit -> subscribed
Fruit -> Event next(🍎)
Fruit -> Event next(🍐)
Fruit -> Event completed
Fruit -> isDisposed

RxSwift.Resources.total::提供所有Rx资源分配的计数,这对于在开发期间检测泄漏非常有用。

print(RxSwift.Resources.total)

let subject = BehaviorSubject(value: "Cooci")

let subscription1 = subject.subscribe(onNext: { print($0) })

print(RxSwift.Resources.total)

let subscription2 = subject.subscribe(onNext: { print($0) })

print(RxSwift.Resources.total)

subscription1.dispose()

print(RxSwift.Resources.total)

subscription2.dispose()

print(RxSwift.Resources.total)
7、链接

multicast:将源可观察序列转换为可连接序列,并通过指定的主题广播其发射。

func testMulticastConnectOperators(){
    print("*****multicast*****")
    let subject = PublishSubject<Any>()
    subject.subscribe{print("00:\($0)")}
        .disposed(by: disposeBag)
    
    let netOB = Observable<Any>.create { (observer) -> Disposable in
            sleep(2)// 模拟网络延迟
            print("我开始请求网络了")
            observer.onNext("请求到的网络数据")
            observer.onNext("请求到的本地")
            observer.onCompleted()
            return Disposables.create {
                print("销毁回调了")
            }
        }.publish()
    
    netOB.subscribe(onNext: { (anything) in
            print("订阅1:",anything)
        })
        .disposed(by: disposeBag)

    // 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的额地方
    // 所以在订阅一次 会出现什么问题?
    netOB.subscribe(onNext: { (anything) in
            print("订阅2:",anything)
        })
        .disposed(by: disposeBag)
    
    _ = netOB.connect()
}

分析:
底层逻辑探索中间变量 ConnectableObservableAdapter 保存了源序列 source、中间序列 makeSubject 。
订阅流程 self.lazySubject.subscribe(observer) 一个懒加载的序列,保证了中间变量 ConnectableObservableAdapter 每一次都是同一个响应序列。
剩下就是 PublishSubject 的订阅效果。
完事等待源序列的响应,但是我们的源序列的订阅是在 connect 函数里面!如果没有调用 connect 函数,意味着就永远不会发送响应。这样背后的逻辑就是,前面所以的发送响应在 connect 函数之前的都没有任何的意义!
以上也就说明了我们的 publish 就是状态共享的:connnect 一次我们序列发送一次响应(响应所有订阅)。

replay:确保观察者接收到同样的序列,即使是在 Observable 发出元素后才订阅。replay 操作符将 Observable 转换为可被连接的 Observable,并且这个可被连接的 Observable 将缓存最新的 n 个元素。当有新的观察者对它进行订阅时,它就把这些被缓存的元素发送给观察者。

func testReplayConnectOperators(){   
    let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).replay(5)
    
    interval.subscribe(onNext: { print(Date.time,"订阅: 1, 事件: \($0)") })
        .disposed(by: self.disposeBag)
    
    delay(2) { _ = interval.connect() }
    
    delay(4) {
        interval.subscribe(onNext: { print(Date.time,"订阅: 2, 事件: \($0)") })
            .disposed(by: self.disposeBag)
    }
    
    delay(8) {
        interval.subscribe(onNext: { print(Date.time,"订阅: 3, 事件: \($0)") })
            .disposed(by: self.disposeBag)
    }
    delay(20, closure: {
        self.disposeBag = DisposeBag()
    })
    
    /**
     订阅: 1, 事件: 4
     订阅: 1, 事件: 0
     2019-05-28 21-32-42 订阅: 2, 事件: 0
     2019-05-28 21-32-42 订阅: 1, 事件: 1
     2019-05-28 21-32-42 订阅: 2, 事件: 1
     2019-05-28 21-32-45 订阅: 2, 事件: 4
     2019-05-28 21-32-46 订阅: 3, 事件: 0
     2019-05-28 21-32-46 订阅: 3, 事件: 1
     2019-05-28 21-32-46 订阅: 3, 事件: 2
     2019-05-28 21-32-46 订阅: 3, 事件: 3
     2019-05-28 21-32-46 订阅: 3, 事件: 4
     
     // 序列从 0开始
     // 定时器也没有断层  sub2 sub3 和 sub1 是同步的
     */
}

4、其他方法

amb:在多个源 Observables 中, 取第一个发出元素或产生事件的 Observable,然后只发出 这个 Observable 中的元素。当你传入多个 Observables 到 amb 操作符时,它将取其中一个 Observable:第一个产生事件的那个 Observable,可以是一个 next,error 或者 completed 事件。 amb 将忽略掉其他的 Observables。

buffer:缓存元素,然后将缓存的元素集合,周期性的发出来。buffer 操作符将缓存 Observable 中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来。

concatMap:将 Observable 的元素转换成其他的 Observable,然后将这些 Observables 串连起来。concatMap 操作符将源 Observable 的每一个元素应用一个转换方法,将他们转换成 Observables。然后让这些 Observables 按顺序的发出元素,当前一个 Observable 元素发送完毕后,后一个 Observable 才可以开始发出元素。等待前一个 Observable 产生完成事件后,才对后一个 Observable 进行订阅。

let disposeBag = DisposeBag()

let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")

let subject = BehaviorSubject(value: subject1)

subject.asObservable()
        .concatMap { $0 }
        .subscribe { print($0) }
        .disposed(by: disposeBag)

subject1.onNext("🍐")
subject1.onNext("🍊")

subject.onNext(subject2)

subject2.onNext("I would be ignored")
subject2.onNext("🐱")

subject1.onCompleted()

subject2.onNext("🐭")

//result
next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)

connect:通知 ConnectableObservable 可以开始发出元素了。ConnectableObservable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以等所有观察者全部订阅完成后,才发出元素。

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .publish()

_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    _ = intSequence.connect()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
  _ = intSequence
      .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}

DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
  _ = intSequence
      .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

//result
Subscription 1:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6
...

debounce:元素发出频率过高时会被过滤。
过滤掉高频产生的元素。debounce 操作符将发出这种元素,在 Observable 产生这种元素后,一段时间内没有新元素产生。

delay:将 Observable 的每一个元素拖延一段时间后发出。delay 操作符将修改一个 Observable,它会将 Observable 的所有元素都拖延一段设定好的时间, 然后才将它们发送出来。

delaySubscription:进行延时订阅。delaySubscription 操作符将在经过所设定的时间后,才对 Observable 进行订阅操作。

dematerialize:dematerialize 操作符将 materialize 转换后的元素还原。

do:当 Observable 的某些事件产生时,你可以使用 do 操作符来注册一些回调操作。这些回调会被单独调用,它们会和 Observable 原本的回调分离。

groupBy:将源 Observable 分解为多个子 Observable,并且每个子 Observable 将源 Observable 中“相似”的元素发送出来。

ignoreElements:忽略掉所有的元素,只发出 error 或 completed 事件。ignoreElements 操作符将阻止 Observable 发出 next 事件,但是允许他发出 error 或 completed 事件。如果你并不关心 Observable 的任何元素,你只想知道 Observable 在什么时候终止,那就可以使用 ignoreElements 操作符。

materialize:将序列产生的事件,转换成元素。通常,一个有限的 Observable 将产生零个或者多个 onNext 事件,然后产生一个 onCompleted 或者 onError 事件。materialize 操作符将 Observable 产生的这些事件全部转换成元素,然后发送出来。

observeOn:指定 Observable 在那个 Scheduler 发出通知。ReactiveX 使用 Scheduler 来让 Observable 支持多线程。使用 observeOn 操作符,来指示 Observable 在哪个 Scheduler 发出通知。
注:一旦产生了 onError 事件, observeOn 操作符将立即转发。他不会等待 onError 之前的事件全部被收到。这意味着 onError 事件可能会跳过一些元素提前发送出去。

publish:publish 会将 Observable 转换为可被连接的 Observable。可被连接的 Observable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以控制 Observable 在什么时候开始发出元素。

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .publish()

_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    _ = intSequence.connect()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
  _ = intSequence
      .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}

DispatchQueue.main.asyncAfter(deadline: .now() + 6) {
  _ = intSequence
      .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

//result
Subscription 1:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6
...

refCount:将可被连接的 Observable 转换为普通 Observable。
可被连接的 Observable 和普通的 Observable 十分相似,不过在被订阅后不会发出元素,直到 connect 操作符被应用为止。这样一来你可以控制 Observable 在什么时候开始发出元素。
refCount 操作符将自动连接和断开可被连接的 Observable。它将可被连接的 Observable 转换为普通 Observable。当第一个观察者对它订阅时,那么底层的 Observable 将被连接。当最后一个观察者离开时,那么底层的 Observable 将被断开连接。

sample:sample 操作符将不定期的对源 Observable 进行取样操作。通过第二个 Observable 来控制取样时机。一旦第二个 Observable 发出一个元素,就从源 Observable 中取出最后产生的元素。

shareReplay:使观察者共享 Observable,观察者会立即收到最新的元素,即使这些元素是在订阅前产生的。shareReplay 操作符将使得观察者共享源 Observable,并且缓存最新的 n 个元素,将这些元素直接发送给新的观察者。

skipWhile:skipWhile 操作符可以让你忽略源 Observable 中头几个元素,直到元素的判定为否后,它才镜像源 Observable。

let disposeBag = DisposeBag()

Observable.of(1, 2, 3, 4, 3, 2, 1)
    .skipWhile { $0 < 4 }
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

//result
4
3
2
1

subscribeOn:指定 Observable 在那个 Scheduler 执行。
(observeOn 操作符非常相似。它指示 Observable 在哪个 Scheduler 发出通知)

timeout:如果 Observable 在一段时间内没有产生元素,timeout 操作符将使它发出一个 error 事件。

using:通过使用 using 操作符创建 Observable 时,同时创建一个可被清除的资源,一旦 Observable 终止了,那么这个资源就会被清除掉了。

window:将 Observable 分解为多个子 Observable,周期性的将子 Observable 发出来。 window 操作符和 buffer 十分相似,buffer 周期性的将缓存的元素集合发送出来,而 window 周期性的将元素集合以 Observable 的形态发送出来。buffer 要等到元素搜集完毕后,才会发出元素序列。而 window 可以实时发出元素序列。

withLatestFrom:withLatestFrom 操作符将两个 Observables 中最新的元素通过一个函数组合起来,然后将这个组合的结果发出来。当第一个 Observable 发出一个元素时,就立即取出第二个 Observable 中最新的元素,通过一个组合函数将两个最新的元素合并后发送出去。

let disposeBag = DisposeBag()
let firstSubject = PublishSubject<String>()
let secondSubject = PublishSubject<String>()

firstSubject
     .withLatestFrom(secondSubject)
     .subscribe(onNext: { print($0) })
     .disposed(by: disposeBag)

firstSubject.onNext("🅰️")
firstSubject.onNext("🅱️")
secondSubject.onNext("1")
secondSubject.onNext("2")
firstSubject.onNext("🆎")

//result
2
let disposeBag = DisposeBag()
let firstSubject = PublishSubject<String>()
let secondSubject = PublishSubject<String>()

firstSubject
     .withLatestFrom(secondSubject) {
          (first, second) in
          return first + second
     }
     .subscribe(onNext: { print($0) })
     .disposed(by: disposeBag)

firstSubject.onNext("🅰️")
firstSubject.onNext("🅱️")
secondSubject.onNext("1")
secondSubject.onNext("2")
firstSubject.onNext("🆎")

//result
🆎2

5、应用

1、UISlider 监听
slider.rx.value.asObservable()
        .subscribe(onNext: {
            print("当前值为:\($0)")
          })
        .disposed(by: disposeBag)

注:任何对象通过 asObservable 都可以实现监听,如:collectionView.mj_footer!.rx.refreshing.asObservable()
2、列表刷新
let item = Observable<[SocilaModel]>.just(self.dataArr!)
//或
let item = Observable.from(optional: self.dataArr!)

item.bind(to: self.listTab.rx.items) { (tableView, row, element) in
               let cell = tableView.dequeueReusableCell(withIdentifier: "cell")
                      as! SecondSocialTableViewCell                
               cell.leftImageView.kf.setImage(with: URL(string: element.image_list ?? ""))
               cell.titleLabel.text = element.theme_name
               cell.descripLabel.text = element.info
               //cell中按钮点击事件订阅
               cell.RXButton.rx.tap.asDriver()
                         .drive(onNext: { [weak self] in
                             self?.showAlert(title: "\(row)", message: "哈哈哈")
                         }).disposed(by: cell.disposeBag)
                  return cell
              }
          .disposed(by: self.disposeBag)
        }
   }
3、按钮事件
lazy var bag: DisposeBag = DisposeBag()
button.rx.tap.subscribe { event in
  print("按钮点击")
}.disposed(by: bag)
4、监听 UITextfield
//on 方法
tf.rx.text.subscribe { event in
  print("输入变化:\(event.element!!)")
}.disposed(by: bag)

//onNext 方法
tf.rx.text.subscribe (onNext: {string in
  print("输入变化:\(string!)")
}).disposed(by: bag)

//绑定数据
tf.rx.text.bind(to: label.rx.text).disposed(by: bag)

//多个 textField 监听,改变 button 显示
Observable.combineLatest(phoneTextField.rx.text, passwordCodeTextField.rx.text)
                .map({ (userName, password) in
                    if (userName?.count ?? 0) >= 11 && (password?.count ?? 0) >= 4 {
                        return CGFloat(1)
                    }
                    return CGFloat(0.2)
                })
                .bind(to: loginButton.rx.alpha)
                .disposed(by: disposeBag)
5、监听 label 属性
label.rx.observe(String.self, "text").subscribe { (str: String?) in
   print("str")
}.disposed(by: bag)

label.rx.observe(CGRect.self, "frame").subscribe { (rect: CGRect?) in
    print("rect")
}.disposed(by: bag)
6、UIScrollView
scrollView.rx.contentOffset.subscribe(onNext:  { point in
  print("滚动偏移:\(point.element!)")
}).disposed(by: bag)
7、网络请求
//MARK: - RxSwift应用-网络请求
func setupNextwork() {
    let url = URL(string: "https://www.baidu.com")
    URLSession.shared.rx.response(request: URLRequest(url: url!))
        .subscribe(onNext: { (response, data) in
            print("response ==== \(response)")
            print("data ===== \(data)")
        }, onError: { (error) in
            print("error ===== \(error)")
        }).disposed(by: disposeBag)
}
8、通知
NotificationCenter.default.rx
    .notification(.UIApplicationWillEnterForeground)
    .subscribe(onNext: { (notification) in
      print("Application Will Enter Foreground")
    })
    .disposed(by: disposeBag)
9、KVO
    // 系统KVO 还是比较麻烦的
    // person.addObserver(self, forKeyPath: "name", options: .new, context: nil)
    person.rx.observeWeakly(String.self, "name").subscribe(onNext: { (change) in
        print(change ?? "helloword")
    }).disposed(by: disposeBag)
10、手势事件
    tap.rx.event.subscribe { (event) in
        print("点了label")
    }.disposed(by: disposeBag) 
11、定时器
//MARK: - RxSwift应用-timer定时器
func setupTimer() {
    timer = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    timer.subscribe(onNext: { (num) in
        print("hello word \(num)")
    }).disposed(by: disposeBag)
}
12、监听事件的生命周期

通过 doOn 方法来监听事件的生命周期,它会在每一次事件发送前被调用,同时也可以通过不同 block 回调处理不同类型的 event。

let observable = Observable.of("A", "B", "C")
 
observable
    .do(onNext: { element in
        print("Intercepted Next:", element)
    }, onError: { error in
        print("Intercepted Error:", error)
    }, onCompleted: {
        print("Intercepted Completed")
    }, onDispose: {
        print("Intercepted Disposed")
    })
    .subscribe(onNext: { element in
        print(element)
    }, onError: { error in
        print(error)
    }, onCompleted: {
        print("completed")
    }, onDisposed: {
        print("disposed")
    })
上一篇下一篇

猜你喜欢

热点阅读