iOS

学习 RxSwift & RxCocoa

2019-03-15  本文已影响0人  FicowShen
内容概览:



为什么要使用Rx(ReactiveX)?

另外,官方文档解释了一个很重要的点:ReactiveX不是函数响应式编程(Functional Reactive Programming)!
请不要再被人忽悠,也不要用这个概念去忽悠别人~

Rx Marble Diagrams(宝石图)

Observable发出事件,然后被Observer接收并被转换为其他形式,转换后的事件又继续被发出。
当Observable序列完成时,或者出现错误时,序列被终结。

Observable是一个序列,类似于Swift中的序列,但是这个序列可以异步接收元素。
Observable的subscribe方法,类似于Swift序列中的makeIterator方法。
Observer(通常是一系列的闭包)需要传递给Observable的subscribe,以接收序列的元素。

Rx抽象了时间状态机,我们可以更方便地去完成业务逻辑。
否则,我们就要重复地去编写一些有关临时状态转换的逻辑代码。

宝石图可以帮助你理解Rx中的操作符

一个关于数字的序列:
--1--2--3--4--5--6--| // 序列完成,并终结

另一个关于字符的序列
--a--b--a--a--a---d---X // 序列发生错误,并终结

有些序列是无穷无尽的,比如按钮点击事件序列:
---tap-tap-------tap--->

这里有一条基本的规则:

关键概念


Event - 事件

/// 代表一系列事件
public enum Event<Element> {
    /// 序列产生了一个新的元素
    case next(Element)
    /// 创建序列时产生了一个错误,导致序列终止
    case error(Swift.Error)
    /// 序列的所有元素都已经成功产生,整个序列已经完成
    case completed
}

请注意,序列产生了错误,就会导致序列终止!
有时候,这并不是你期待的结果!!!


Observable - 产生事件

创建Observable,并让Observer订阅事件,在事件结束后释放观察者。

typealias JSON = Any

let disposeBag = DisposeBag()

let json: Observable<JSON> = Observable.create { (observer) -> Disposable in
    let task = URLSession.shared.dataTask(with: ...) { data, _, error in
        guard error == nil else {
            observer.onError(error!)
            return
        }
        guard let data = data,
            let jsonObject = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves)
            else {
            observer.onError(DataError.cantParseJSON)
            return
        }
        observer.onNext(jsonObject)
        observer.onCompleted()
    }
    task.resume()
    return Disposables.create { task.cancel() }
}

json.subscribe(onNext: { json in
    print("取得 json 成功: \(json)")
}, onError: { error in
    print("取得 json 失败 Error: \(error.localizedDescription)")
}, onCompleted: {
    print("取得 json 任务成功完成")
}).disposed(by: disposeBag) // 释放Observers

Observable 的冷与热

一创建就可以开始发出事件的,就是热的。
直到有observer订阅才开始发出事件的,就是冷的!

Observable 特征序列(定制版的Observable)

示例Demo:

let disposeBag = DisposeBag()

func getRepo(_ repo: String) -> Single<[String: Any]> {
    return Single<[String: Any]>.create { single in
        let url = URL(string: "https://api.github.com/repos/\(repo)")!
        let task = URLSession.shared.dataTask(with: url) { data, _, error in
            if let error = error {
                single(.error(error))
                return
            }
            guard let data = data,
                  let json = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves),
                  let result = json as? [String: Any] else {
                single(.error(DataError.cantParseJSON))
                return
            }
            single(.success(result))
        }
        task.resume()
        return Disposables.create { task.cancel() }
    }
}

getRepo("ReactiveX/RxSwift").subscribe(onSuccess: { json in
    print("JSON: ", json)
}, onError: { error in
    print("Error: ", error)
}).disposed(by: disposeBag)

订阅提供一个 SingleEvent 的枚举:

public enum SingleEvent<Element> {
    case success(Element)
    case error(Swift.Error)
}

示例Demo:

let disposeBag = DisposeBag()

func cacheLocally() -> Completable {
    return Completable.create { completable in
       // Store some data locally
       ...
       ...

       guard success else {
           completable(.error(CacheError.failedCaching))
           return Disposables.create {}
       }
       completable(.completed)
       return Disposables.create {}
    }
}

cacheLocally().subscribe(onCompleted: {
    print("Completed with no error")
}, onError: { error in
    print("Completed with an error: \(error.localizedDescription)")
}).disposed(by: disposeBag)

订阅提供一个 CompletableEvent 的枚举:

public enum CompletableEvent {
    case error(Swift.Error)
    case completed
}

示例Demo:

let disposeBag = DisposeBag()

func generateString() -> Maybe<String> {
    return Maybe<String>.create { maybe in

        maybe(.success("RxSwift"))
        // OR
        maybe(.completed)
        // OR
        maybe(.error(error))
        return Disposables.create {}
    }
}

generateString().subscribe(onSuccess: { element in
    print("Completed with element \(element)")
}, onError: { error in
    print("Completed with an error \(error.localizedDescription)")
}, onCompleted: {
    print("Completed with no element")
}).disposed(by: disposeBag)

订阅提供一个 MaybeEvent 的枚举:

public enum MaybeEvent<Element> {
    case success(Element)
    case error(Swift.Error)
    case completed
}

为什么要使用Driver?


Observer - 响应事件

button.rx.tap.subscribe(onNext: { [weak self] in
    self?.showAlert()
}, onError: { error in
    print("发生错误: \(error.localizedDescription)")
}, onCompleted: {
    print("任务完成")
})

创建观察者最直接的方法就是在 Observable 的 subscribe 方法后面描述事件发生时,需要如何做出响应。
而观察者就是由后面的 onNext,onError,onCompleted的这些闭包构建出来的。

特征观察者(定制的Observer)
URLSession.shared.rx.data(request: URLRequest(url: url))
.subscribe(onNext: { data in
    print("Data Task Success with count: \(data.count)")
}, onError: { error in
    print("Data Task Error: \(error)")
})
.disposed(by: disposeBag)

以上代码可以看作:

let observer: AnyObserver<Data> = AnyObserver { (event) in
    switch event {
    case .next(let data):
        print("Data Task Success with count: \(data.count)")
    case .error(let error):
        print("Data Task Error: \(error)")
    default:
        break
    }
}

URLSession.shared.rx.data(request: URLRequest(url: url))
.subscribe(observer)
.disposed(by: disposeBag)
let observer: AnyObserver<Bool> = AnyObserver { [weak self] (event) in
    switch event {
    case .next(let isHidden):
        self?.usernameValidOutlet.isHidden = isHidden
    default:
        break
    }
}

usernameValid
.bind(to: observer)
.disposed(by: disposeBag)

以上代码如果用Binder实现,效果更佳。

let observer: Binder<Bool> = Binder(usernameValidOutlet) { (view, isHidden) in
    view.isHidden = isHidden
}

usernameValid
.bind(to: observer)
.disposed(by: disposeBag)

Binder 可以只处理 next 事件,并且保证响应 next 事件的代码一定会在给定 Scheduler 上执行,这里采用默认的 MainScheduler



RxCocoa中已经采用Binder实现了很多常用的观察者,比如:

extension Reactive where Base: UIView {
  public var isHidden: Binder<Bool> {
      // self.base是实际的UI控件对象
      return Binder(self.base) { view, hidden in
          view.isHidden = hidden
      }
  }
}

extension Reactive where Base: UIControl {
  public var isEnabled: Binder<Bool> {
      return Binder(self.base) { control, value in
          control.isEnabled = value
      }
  }
}

extension Reactive where Base: UILabel {
  public var text: Binder<String?> {
      return Binder(self.base) { label, text in
          label.text = text
      }
  }
}

你也可以用这种方式来创建自定义的 UI 观察者

Observable & Observer

有些事物比较特别。它们既是可被监听的序列也是观察者。
有许多 UI 控件都存在这种特性,例如:switch的开关状态,segmentedControl的选中索引号,datePicker的选中日期等等。

// 作为可被监听的序列
let observable = textField.rx.text
observable.subscribe(onNext: { text in show(text: text) })

// 作为观察者
let observer = textField.rx.text
let text: Observable<String?> = ...
text.bind(to: observer)



框架里面定义了一些辅助类型可以帮助你更准确的描述事物的特征,它们既是可被监听的序列也是观察者。


Operator - 创建变化组合事件

操作符可以帮助大家创建新的序列,或者变化组合原有的序列,从而生成一个新的序列。

filter - 过滤

// 温度
let rxTemperature: Observable<Double> = ...

// filter 操作符
rxTemperature.filter { temperature in temperature > 33 }
.subscribe(onNext: { temperature in
    print("高温:\(temperature)度")
})
.disposed(by: disposeBag)

map - 转换

// JSON
let json: Observable<JSON> = ...

// map 操作符
json.map(Model.init)
.subscribe(onNext: { model in
    print("取得 Model: \(model)")
})
.disposed(by: disposeBag)

zip - 配对

// 汉堡
let rxHamburg: Observable<Hamburg> = ...
// 薯条
let rxFrenchFries: Observable<FrenchFries> = ...

// zip 操作符
Observable.zip(rxHamburg, rxFrenchFries)
.subscribe(onNext: { (hamburg, frenchFries) in
    print("取得汉堡: \(hamburg) 和薯条:\(frenchFries)")
})
.disposed(by: disposeBag)

Rx提供了充分的操作符来帮我们创建序列。当然如果内置操作符无法满足你的需求时,你还可以创建自定义的操作符。

如果你不确定该如何选择操作符,可以参考 决策树。它会引导你找出合适的操作符。


Disposable - 管理绑定(订阅)的生命周期

通常来说,一个序列如果发出了 error 或者 completed 事件,那么所有内部资源都会被释放。
如果你需要提前释放这些资源或取消订阅的话,那么你可以对返回的 可被清除的资源(Disposable) 调用 dispose 方法。
但是,推荐使用 清除包(DisposeBag) 或者 takeUntil 操作符 来管理订阅的生命周期。

DisposeBag用法:

var disposeBag = DisposeBag()

override func viewWillAppear(_ animated: Bool) {
    super.viewWillAppear(animated)

    textField.rx.text.orEmpty
        .subscribe(onNext: { text in print(text) })
        .disposed(by: self.disposeBag)
}

override func viewWillDisappear(_ animated: Bool) {
    super.viewWillDisappear(animated)

    self.disposeBag = DisposeBag()
}

takeUntil 用法:

override func viewDidLoad() {
    super.viewDidLoad()

    _ = usernameValid
        .takeUntil(self.rx.deallocated)
        .bind(to: passwordOutlet.rx.isEnabled)

    _ = usernameValid
        .takeUntil(self.rx.deallocated)
        .bind(to: usernameValidOutlet.rx.isHidden)

    _ = passwordValid
        .takeUntil(self.rx.deallocated)
        .bind(to: passwordValidOutlet.rx.isHidden)

    _ = everythingValid
        .takeUntil(self.rx.deallocated)
        .bind(to: doSomethingOutlet.rx.isEnabled)

    _ = doSomethingOutlet.rx.tap
        .takeUntil(self.rx.deallocated)
        .subscribe(onNext: { [weak self] in self?.showAlert() })
}

Schedulers - 线程队列调配

Schedulers 是 Rx 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。



一个比较典型的例子就是,在后台发起网络请求,然后解析数据,最后在主线程刷新页面。你就可以先用 subscribeOn 切到后台去发送请求并解析数据,最后用 observeOn 切换到主线程更新页面。

GCD实现:

DispatchQueue.global(qos: .userInitiated).async {
    let data = try? Data(contentsOf: url)
    DispatchQueue.main.async {
        self.data = data
    }
} 

RxSwift实现:

let rxData: Observable<Data> = ...

rxData
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.observeOn(MainScheduler.instance)
.subscribe(onNext: { [weak self] data in
    self?.data = data
})
.disposed(by: disposeBag)

Error Handling - 错误处理

一旦序列里面产出了一个 error 事件,整个序列将被终止。RxSwift 主要有两种错误处理机制:

retry - 重试

请求 JSON 失败时,立即重试,重试 3 次后仍然失败就将错误抛出:

let rxJson: Observable<JSON> = ...

rxJson
.retry(3)
.subscribe(onNext: { json in
    print("取得 JSON 成功: \(json)")
}, onError: { error in
    print("取得 JSON 失败: \(error)")
})
.disposed(by: disposeBag)

请求 JSON 失败时,等待 5 秒后重试:

let retryDelay: Double = 5  // 重试延时 5 秒

rxJson
.retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
    return Observable.timer(retryDelay, scheduler: MainScheduler.instance)
}
.subscribe(...)
.disposed(by: disposeBag)

retryWhen 操作符主要描述应该在何时重试,并且通过闭包里面返回的 Observable 来控制重试的时机。

如果重试超过 4 次,就将错误抛出。如果错误在 4 次以内时,就等待 5 秒后重试:

let maxRetryCount = 4       // 最多重试 4 次
let retryDelay: Double = 5  // 重试延时 5 秒

rxJson
.retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
    return rxError.flatMapWithIndex { (error, index) -> Observable<Int> in
        guard index < maxRetryCount else {
            return Observable.error(error)
        }
        return Observable<Int>.timer(retryDelay, scheduler: MainScheduler.instance)
    }
}
.subscribe(...)
.disposed(by: disposeBag)

flatMapWithIndex 操作符可以提供错误的索引数 index。然后用这个索引数判断是否超过最大重试数,如果超过了,就将错误抛出。如果没有超过,就等待 5 秒后重试。

catch - 恢复

catchError 可以在错误产生时,用一个备用元素或者一组备用元素将错误替换掉。

当错误产生时,就返回一个空数组,于是就会显示一个空列表页:

searchBar.rx.text.orEmpty
...
.flatMapLatest { query -> Observable<[Repository]> in
    ...
    return searchGitHub(query)
        .catchErrorJustReturn([])
}
...
.bind(to: ...)
.disposed(by: disposeBag)

你也可以使用 catchError,当错误产生时,将错误事件替换成一个备选序列:

// 先从网络获取数据,如果获取失败了,就从本地缓存获取数据

let rxData: Observable<Data> = ...      // 网络请求的数据
let cahcedData: Observable<Data> = ...  // 之前本地缓存的数据

rxData
.catchError { _ in cahcedData }
.subscribe(onNext: { date in
    print("获取数据成功: \(date.count)")
})
.disposed(by: disposeBag)
Result

如果我们只是想给用户错误提示,那要如何操作呢?

以下提供一个最为直接的方案,不过这个方案存在一些问题:

updateUserInfoButton.rx.tap
.withLatestFrom(rxUserInfo)
.flatMapLatest { userInfo -> Observable<Void> in
    return update(userInfo)
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: {
    print("用户信息更新成功")
}, onError: { error in
    print("用户信息更新失败: \(error.localizedDescription)")
})
.disposed(by: disposeBag)

这样实现是非常直接的。但是 一旦网络请求操作失败了,序列就会终止!!整个订阅将被取消!!
如果用户再次点击更新按钮,就无法再次发起网络请求进行更新操作了。

为了解决这个问题,我们需要选择合适的方案来进行错误处理。例如使用枚举 Result:

// 自定义一个枚举类型 Result
public enum Result<T> {
    case success(T)
    case failure(Swift.Error)
}

然后之前的代码需要修改成:

updateUserInfoButton.rx.tap
.withLatestFrom(rxUserInfo)
.flatMapLatest { userInfo -> Observable<Result<Void>> in
    return update(userInfo)
        .map(Result.success)  // 转换成 Result
        .catchError { error in Observable.just(Result.failure(error)) }
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: { result in
    switch result {           // 处理 Result
    case .success:
        print("用户信息更新成功")
    case .failure(let error):
        print("用户信息更新失败: \(error.localizedDescription)")
    }
})
.disposed(by: disposeBag)

这样我们的错误事件被包装成了 Result.failure(Error) 元素,就不会终止整个序列。就算网络请求失败,整个订阅依然存在。如果用户再次点击更新按钮,也是能够发起网络请求进行更新操作的。

除此之外,强烈建议阅读 How to handle errors in RxSwift



最后,对于初学者,建议将RxSwift代码仓库下载到本地。

然后,打开 Rx.xcworkspace
在左上角的 Schema 选中 RxSwift-macOS,然后 Build (快捷键:Command + B)项目。
在Build结束后,进入 Project navigator (快捷键:Command + 1) 并找到 Rx - Rx.playground,然后打开调试窗口(快捷键:Shift + Command + Y) 以查看示例代码执行效果。

在这个Playground中,官方对 所有的操作符 进行了阐释,这可以帮助你迅速掌握RxSwift。

最后的最后,强烈建议初学者阅读 RxSwift - Getting Started!!!




参考文章:
ReactiveX Introduction
Github: ReactiveX / RxSwift
RxMarbles(常见宝石图)
RxSwift - Getting Started
RxSwift 中文文档 (本文大部分内容来源于此文档)
ReactiveX文档中文翻译
How to handle errors in RxSwift



如需转载,请注明出处,谢谢 ~

上一篇下一篇

猜你喜欢

热点阅读