Swift-【Observable 1】

2021-08-28  本文已影响0人  NJ_墨

let myError = NSError.init(domain: "com.myerror.cn", code: 10010, userInfo: nil)

func test_catchAndReturn() {
    // 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止
    let sequenceThatFails = PublishSubject<String>()
    sequenceThatFails
        .catchAndReturn("catchAndReturn")
        .subscribe{print($0)}
        .disposed(by: disposeBag)

    sequenceThatFails.onNext("2")
    sequenceThatFails.onNext("3")// 正常序列发送成功的
    sequenceThatFails.onError(self.myError) //发送失败的序列,一旦订阅到位 返回我们之前设定的错误的预案
    sequenceThatFails.onNext("4")
    /**
     next(2)
     next(3)
     next(catchAndReturn)
     completed
     */
}
func test_catchError() {
    let sequenceThatFails = PublishSubject<String>()
    sequenceThatFails
        .catchAndReturn("catchAndReturn")
        .subscribe{print($0)}
        .disposed(by: disposeBag)
    
    // 通过切换到提供的恢复可观察序列,从错误事件中恢复
    print("*****catchError*****")
    
    let recoverySequence = PublishSubject<String>()
    
    recoverySequence
        .catch {
        print("Error:",$0)
        return recoverySequence // 获取到了错误序列-我们在中间的闭包操作处理完毕
        }
        .subscribe{print($0)}
        .disposed(by: disposeBag)
    
    sequenceThatFails.onNext("test1")
    sequenceThatFails.onNext("test2")  // 正常序列发送成功的
    sequenceThatFails.onError(myError) // 发送失败的序列
    
    recoverySequence.onNext("rec1")
    recoverySequence.onNext("rec2")
    sequenceThatFails.onError(self.myError) // 发送失败的序列
    recoverySequence.onNext("rec2")
    recoverySequence.onNext("rec4")
    
    /**
     next(test1)
     next(test2)
     next(catchAndReturn)
     completed
     next(rec1)
     next(rec2)
     next(rec2)
     next(rec4)
     */
}
func test_retry() {
    // retry: 通过无限地重新订阅可观察序列来恢复重复的错误事件
    print("*****retry*****")
    var count = 1
    let sequenceRetryErrors = Observable<String>.create { (observer) -> Disposable in
        observer.onNext("1")
        observer.onNext("2")
        observer.onNext("3")
        
        if count == 1 { // 条件可以作为出口,失败的次数
            observer.onError(self.myError)
            print("错误序列来了")
            count += 1
        }
        
        observer.onNext("test1")
        observer.onNext("test2")
        observer.onNext("test3")
        observer.onCompleted()
        return Disposables.create()
    }
    
    sequenceRetryErrors
        .retry() //调用这个retry后,上面的observer闭包会重新执行一次
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
    
    /**
     *****retry*****
     1
     2
     3
     错误序列来了
     1
     2
     3
     test1
     test2
     test3
     */
}

/// retry(_:): 重试次数达到max未遂计数
func test_retry2() {
  
    print("*****retry(_:)*****")
    var count = 1
    let sequenceThatErrors = Observable<String>.create { observer in
        observer.onNext("1")
        observer.onNext("2")
        observer.onNext("3")
        
        if count < 5 { // 
            observer.onError(self.myError) //发送错误消息
            print("错误序列来了")
            count += 1
        }
        //发送错误后,下面的sender都不会打印了
        observer.onNext("sender 1")
        observer.onNext("sender 2")
        observer.onNext("sender 3")
        observer.onCompleted()
        
        return Disposables.create()
    }
    
    sequenceThatErrors
        .retry(3) //重复地从错误事件中恢复3次
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
    
    /**
     *****retry(_:)*****
     1
     2
     3
     错误序列来了
     1
     2
     3
     错误序列来了
     1
     2
     3
     错误序列来了
     */
}
func test_concat() {
    // *** concat: 
    // 用来控制顺序
    let subject1 = BehaviorSubject(value: "1")
    let subject2 = BehaviorSubject(value: "11")
    
    let subjectsSubject = BehaviorSubject(value: subject1)
    subjectsSubject.asObservable()
        .concat()
        .subscribe { print($0) }
        .disposed(by: disposeBag)
    
    subject1.onNext("2")
    subject1.onNext("3")
    
    subjectsSubject.onNext(subject2)
    subject1.onNext("4")

    subject2.onNext("22")
    
    //subject1.onCompleted() // 必须要等subject1 完成了才能订阅到! 用来控制顺序 网络数据的异步
    subject2.onNext("33")
    /** 未打开completed
     next(1)
     next(2)
     next(3)
     next(4)
     */
    
    /** 打开completed
     next(1)
     next(2)
     next(3)
     next(4)
     next(22)
     next(33)

     */
}
func test_fliter() {
    Observable.of(1,2,3,4,5,6,7,8,9,0)
        .filter{$0 % 2 == 0}
        .subscribe(onNext: {print($0)})  //2 4 6 8 0
        .disposed(by: disposeBag)
}
func test_distinctUntilChanged() {
    Observable.of("1", "2", "2", "2", "3", "3", "4")
        .distinctUntilChanged()
        .subscribe(onNext: {print($0)}) //1,2,3,4
        .disposed(by: disposeBag)
    
    Observable.of("1", "2", "2", "2", "3", "3", "4")
        .distinctUntilChanged()
        .toArray()
        .subscribe({ value in
            print(value) //success(["1", "2", "3", "4"])
        })
        .disposed(by: disposeBag)
}
func test_elementAt() {
    print("*****elementAt*****")
    Observable.of("C", "o", "o", "c", "i")
        .element(at: 3)
        .subscribe(onNext: { print($0) })  //c
        .disposed(by: disposeBag)
}
func test_takeLast() {
    Observable.of("1", "2","3", "4","5")
        .takeLast(3)//取从末尾开始算起的3个元素
        .subscribe(onNext: { print($0) })  //3,4,5
        .disposed(by: disposeBag)
}
func test_takeWhile() {
    Observable.of(1, 2, 3, 4, 5, 6)
        .take(while:{ $0 < 3 }) //取出满足条件的元素 (1,2)
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
}
func test_skip() {
    Observable.of(1,2,3,4,5,6)
    .skip(2) //直接跳过前面两个元素,即从3开始
    .subscribe(onNext: {print($0)})  //3,4,5,6
    .disposed(by: disposeBag)
}
func test_skipWhile() {
    Observable.of(1, 2, 3, 4, 5, 6)
        .skip { $0 < 4 } //满足小于4的都跳过,即只有4,5,6)
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
}
func test_takeUntil() {
    let sourceSequence = PublishSubject<String>()
    let referenceSequence = PublishSubject<String>()
    sourceSequence
        .take(until: referenceSequence)
        .subscribe(onNext: {print($0)})
        .disposed(by: disposeBag)
    
    sourceSequence.onNext("1")
    sourceSequence.onNext("2")
    sourceSequence.onNext("3")

    //referenceSequence.onNext("0") // 条件一出来,下面就走不了
    sourceSequence.onNext("4")
    sourceSequence.onNext("5")
    sourceSequence.onNext("6")
}
func test_skipUntil() {
    // *** skipUntil: 抑制从源可观察序列发出元素,直到参考可观察序列发出元素
    let sourceSeq = PublishSubject<String>()
    let referenceSeq = PublishSubject<String>()
    
    sourceSeq
        .skip(until: referenceSeq)
        .subscribe(onNext: { print($0) })
        .disposed(by: disposeBag)
    
    //这些不输出
    sourceSeq.onNext("1")
    sourceSeq.onNext("2")
    sourceSeq.onNext("3")

    referenceSeq.onNext("0") // 条件一出来,下面就可以走了
    sourceSeq.onNext("11")
    sourceSeq.onNext("22")
    sourceSeq.onNext("33")
}
上一篇下一篇

猜你喜欢

热点阅读