RxSwift(5)-Observable的特殊序列
为了方便我们我们日常的使用,RxSwift
在Observable
基础上封装了一些我们常用的特殊序列,如Driver
、Single
、Completable
、Maybe
等,下面我们就来看看其使用和原理。
Driver
可以看作是驱动应用程序的可观察序列的构建器模式。主要是为了简化UI
层的代码。
下面我们来看一个例子,我们在输入框输入一行文字,开始去服务器获取数据,返回数据之后刷新页面mLabel
和mButton
上面的文字:
private func httpTest() {
let res = mTextField.rx.text.skip(1)
.flatMap { (input) -> Observable<String> in
return self.fetchData(inputText: input ?? "")
}
res.bind(to: mLabel.rx.text)
.disposed(by: disposeBag)
res.bind(to: mButton.rx.title())
.disposed(by: disposeBag)
}
private func fetchData(inputText: String) -> Observable<String> {
print("---网络请求之前的线程--\(Thread.current)--")
return Observable<String>.create { (observer) -> Disposable in
// 模拟网络请求
DispatchQueue.global().async {
print("---网络请求中的线程--\(Thread.current)--")
observer.onNext("请求回来的数据")
observer.onCompleted()
}
return Disposables.create();
}
}
运行结果如下:
drive01.png可以看出上面的代码其实是有问题的:
- 如果
fetchData
的序列产生了一个错误(网络请求失败),这个错误将取消所有绑定,当用户输入一个新的关键字时,是无法发起新的网络请求。 - 返回的结果被绑定到两个
UI
元素上。那就意味着,每次用户输入一个新的关键字时,就会分别为两个UI
元素发起HTTP
请求。这就是控制台会输出两遍的原因。 - 如果
fetchData
在后台返回序列,那么刷新页面也会在后台进行,这样就会出现异常崩溃。
我们可以对上述代码进行优化:
let res = mTextField.rx.text.skip(1)
.flatMap { (input) -> Observable<String> in
return self.fetchData(inputText: input ?? "")
.observeOn(MainScheduler.instance)
.catchErrorJustReturn("网络请求错误")
}
.share(replay: 1, scope: .whileConnected)
-
observeOn(MainScheduler.instance)
:在主线程返回结果 -
catchErrorJustReturn()
:处理网络错误异常 -
share()
:共享网络请求,防止多次调用
我们可以直接使用Drive
优化:
private func driveTest() {
let res = mTextField.rx.text.asDriver()
.flatMap { input in
self.fetchData(inputText: input ?? "")
.asDriver(onErrorJustReturn: "网络请求错误")
}
res.drive(mLabel.rx.text)
.disposed(by: disposeBag)
res.drive(mButton.rx.title())
.disposed(by: disposeBag)
}
我们只需要把普通的观察序列转化为Drive
,UI
绑定的时候也使用drive
,并且加上一句容错处理即可。
我们来看看源码:
public typealias Driver<E> = SharedSequence<DriverSharingStrategy, E>
Driver
其实是SharedSequence
这个序列的别名。它有以下特点:
- 不会失败
- 在主线程传递事件
- 共享策略是
share(replay: 1, scope: .whileConnected)
,也就是说所有的观察者都共享序列的计算资源
上述例子中,我们调用asDriver()
的时候就会调用进入下面方法:
extension ControlProperty {
public func asDriver() -> Driver<E> {
return self.asDriver { _ -> Driver<E> in
return Driver.empty()
}
}
}
继续跟进:
public func asDriver(onErrorRecover: @escaping (_ error: Swift.Error) -> Driver<E>) -> Driver<E> {
let source = self
.asObservable()
// 主线程
.observeOn(DriverSharingStrategy.scheduler)
.catchError { error in
onErrorRecover(error).asObservable()
}
return Driver(source)
}
public static var scheduler: SchedulerType { return SharingScheduler.make() }
// 主线程
public private(set) static var make: () -> SchedulerType = { MainScheduler() }
这里就可以看到.observeOn
指定调度者的时候就会指定到主线程中。catchError
是对错误信息的处理。最后会返回Driver(source)
方法,Driver
是SharedSequence
这个序列的别名,返回也就是SharedSequence
的初始化方法。
public struct SharedSequence<S: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType {
public typealias E = Element
public typealias SharingStrategy = S
let _source: Observable<E>
init(_ source: Observable<E>) {
self._source = S.share(source)
}
.....
public func asObservable() -> Observable<E> {
return self._source
}
public func asSharedSequence() -> SharedSequence<SharingStrategy, E> {
return self
}
}
其初始化方法会调用DriverSharingStrategy
的share
方法:
public struct DriverSharingStrategy: SharingStrategyProtocol {
public static var scheduler: SchedulerType { return SharingScheduler.make() }
public static func share<E>(_ source: Observable<E>) -> Observable<E> {
return source.share(replay: 1, scope: .whileConnected)
}
}
可以看到最终还是调用了share(replay: 1, scope: .whileConnected)
共享策略。也就是说调用asDriver()
方法就确定了是在主线程订阅观察而且确定了共享策略。
我们再来看看Driver<E>
也就是SharedSequence
调用drive
方法做了什么。
public func drive<O: ObserverType>(_ observer: O) -> Disposable where O.E == E? {
// 判断是否在主线程
MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
return self.asSharedSequence().asObservable().map { $0 as E? }.subscribe(observer)
}
public class func ensureRunningOnMainThread(errorMessage: String? = nil) {
guard Thread.isMainThread else {
rxFatalError(errorMessage ?? "Running on background thread.")
}
}
首先判断是否是在主线程操作,不是的话就报错。然后使用map
函数进行映射调整数据类型,接着调用订阅方法subscribe()
。其实也就是做了一层封装。
Single
只包含一个元素的序列,要么发出一个元素,要么发出错误事件。不能共享资源。
public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>
Single
是PrimitiveSequence
的别名。它定义了两个事件的枚举:
public enum SingleEvent<Element> {
/// 只发出一个元素 默认发出`.next(Element)`和`.completed`)
case success(Element)
/// 只发出错误事件`.error(Error)`)
case error(Swift.Error)
}
我们可以使用如下方法创建一个Single
序列:
func createSingleTest() {
Single<String>.create { (sob) -> Disposable in
if showSuccess {
sob(.success("这是一个成功的Single事件"))
} else {
sob(.error(NSError.init(domain: "错误的signle事件", code: 10000, userInfo: nil)))
}
return Disposables.create()
}.subscribe { (element) in
print("==\(element)==")
}.disposed(by: disposeBag)
}
控制台输出:
// 成功信号
==success("这是一个成功的Single事件")==
// 错误信号
==error(Error Domain=错误的signle事件 Code=10000 "(null)")==
源码如下:
public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<Element> {
let source = Observable<Element>.create { observer in
return subscribe { event in
switch event {
case .success(let element):
observer.on(.next(element))
observer.on(.completed)
case .error(let error):
observer.on(.error(error))
}
}
}
return PrimitiveSequence(raw: source)
}
可以看出来Single
其实是对Observable
的封装,响应只有两种,success
和error
,success
响应之后会正常发出事件,并且发出完成事件,error
就是发出错误事件。
我们再来看看Single
的订阅方法:
public func subscribe(_ observer: @escaping (SingleEvent<Element>) -> Void) -> Disposable {
var stopped = false
return self.primitiveSequence.asObservable().subscribe { event in
if stopped { return }
stopped = true
switch event {
case .next(let element):
observer(.success(element))
case .error(let error):
observer(.error(error))
case .completed:
rxFatalErrorInDebug("Singles can't emit a completion event")
}
}
}
还是调用的Observable
的subscribe
方法,只是对结果做了一层处理,如果是next
事件,转换成success
发送;如果是error
事件就正常发送,不错处理;如果是completed
事件直接报错。也就是Single
是没有单独的completed
事件。
另外,我们也可以使用如下方法创建Single
序列
func asSingleTest() {
Observable.of("1").asSingle()
.subscribe(onSuccess: { (element) in
print("==\(element)==")
}).disposed(by: disposeBag)
}
控制台输出:
==1==
如果把元素改为多个,控制台就会报错:
Unhandled error happened: Sequence contains more than one element.
subscription called from:
我们再来看看源码:
public func asSingle() -> Single<Element> {
return PrimitiveSequence(raw: AsSingle(source: self.asObservable()))
}
final class AsSingle<Element>: Producer<Element> {
fileprivate let _source: Observable<Element>
init(source: Observable<Element>) {
self._source = source
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AsSingleSink(observer: observer, cancel: cancel)
let subscription = self._source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
fileprivate final class AsSingleSink<Observer: ObserverType> : Sink<Observer>, ObserverType {
typealias Element = Observer.Element
private var _element: Event<Element>?
func on(_ event: Event<Element>) {
switch event {
case .next:
// 第一次发送事件的是_element==nil 如果不等于就说明观察序列中有多个元素
if self._element != nil {
self.forwardOn(.error(RxError.moreThanOneElement))
self.dispose()
}
self._element = event
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
// 第二次进来就是completed事件了
if let element = self._element {
self.forwardOn(element)
self.forwardOn(.completed)
}
else {
self.forwardOn(.error(RxError.noElements))
}
self.dispose()
}
}
}
其实Single
就是对Observable
做了一层封装,封装成只能是单个元素或者错误事件的序列。
Completable
Completable
是一种不包含任何元素的序列,也就是包含的元素个数为0,所以它要么只能产生一个completed
事件,要么产生一个error
事件。不具备共享资源的能力。
定义如下:
public typealias Completable = PrimitiveSequence<CompletableTrait, Swift.Never>
它也是PrimitiveSequence
的别名,不过传入的参数不同。Completable
定了两种类型事件:
public enum CompletableEvent {
/// 错误事件 .error(Error)
case error(Swift.Error)
/// 完成事件
case completed
}
我们可以使用如下方法创建Completable
序列:
func createCompletableTest() {
var showCompleted = true
Completable.create { (cob) -> Disposable in
if showCompleted {
cob(.completed)
} else {
cob(.error(NSError.init(domain: "错误的signle信号", code: 10000, userInfo: nil)))
}
return Disposables.create()
}.subscribe(onCompleted: {
print("==完成==")
}) { (error) in
print("==\(error)==")
}.disposed(by: disposeBag)
}
控制台输出:
// .completed
==完成==
// .error
==Error Domain=错误信号 Code=10000 "(null)"==
源码如下:
public static func create(subscribe: @escaping (@escaping CompletableObserver) -> Disposable) -> PrimitiveSequence<Trait, Element> {
let source = Observable<Element>.create { observer in
return subscribe { event in
switch event {
case .error(let error):
observer.on(.error(error))
case .completed:
observer.on(.completed)
}
}
}
return PrimitiveSequence(raw: source)
}
可以看出来Completable
和Single
类似,响应只有两种,completed
和error
,completed
即完成事件,error
就是错误事件。
我们再来看看Completable
的subscribe
方法:
public func subscribe(onCompleted: (() -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil) -> Disposable {
return self.primitiveSequence.subscribe { event in
switch event {
case .error(let error):
if let onError = onError {
onError(error)
}
case .completed:
onCompleted?()
}
}
}
如果是.completed
直接就回调完成的闭包,如果是.error
,就执行onError
发送error
事件。
我们也可以使用asCompletable()
方法:
func asCompletableTest() {
Observable<Never>.empty()
.asCompletable()
.subscribe(onCompleted: {
print("==完成==")
}) { (error) in
print("==\(error)==")
}.disposed(by: disposeBag)
}
控制台输出:
==完成==
源码如下:
public func asCompletable()
-> Completable {
return PrimitiveSequence(raw: self.asObservable())
}
Maybe
Maybe
序列只能做一件事情,也就是只能发出一个事件,它要么只能发出一个元素,要么产生一个completed
事件,要么产生一个error
事件。
定义如下:
public typealias Maybe<Element> = PrimitiveSequence<MaybeTrait, Element>
它也是PrimitiveSequence
的别名,不过传入的参数不同。Maybe
定了三种类型事件:
public enum MaybeEvent<Element> {
/// 发送一个元素且发送完成事件 .next(Element)、.completed
case success(Element)
/// 错误事件 .error(Error)
case error(Swift.Error)
/// 完成事件
case completed
}
我们可以使用如下方法创建Maybe
序列:
func createMaybeTest() {
Maybe<String>.create(subscribe: { (mob) -> Disposable in
mob(.success("成功事件"))
mob(.completed)
mob(.error(NSError.init(domain: "错误事件", code: 10000, userInfo: nil)))
return Disposables.create()
}).subscribe({ (element) in
print("==\(element)==")
}).disposed(by: disposeBag)
}
控制台输出:
==success("成功事件")==
源码如下:
public static func create(subscribe: @escaping (@escaping MaybeObserver) -> Disposable) -> PrimitiveSequence<Trait, Element> {
let source = Observable<Element>.create { observer in
return subscribe { event in
switch event {
case .success(let element):
observer.on(.next(element))
observer.on(.completed)
case .error(let error):
observer.on(.error(error))
case .completed:
observer.on(.completed)
}
}
}
return PrimitiveSequence(raw: source)
}
可以看出来,Maybe
也和和Single
类似,不过它有三种响应,success
、completed
和error
,success
即发送一个元素而且发送完成事件,completed
即完成事件,error
就是错误事件。
我们再来看看Maybe
的subscribe
方法:
public func subscribe(_ observer: @escaping (MaybeEvent<Element>) -> Void) -> Disposable {
var stopped = false
return self.primitiveSequence.asObservable().subscribe { event in
if stopped { return }
stopped = true
switch event {
case .next(let element):
observer(.success(element))
case .error(let error):
observer(.error(error))
case .completed:
observer(.completed)
}
}
}
这里有一个stopped
的变量,专门用来控制Maybe
的事件只能响应一个。
我们也可以使用asMaybe()
方法:
func asMaybeTest() {
Observable<String>.of("A")
.asMaybe()
.subscribe(onSuccess: { (element) in
print("==\(element)==")
}, onError: { (error) in
print("==错误==")
}) {
print("==完成==")
}.disposed(by: disposeBag)
}
控制台输出:
==A==
源码如下:
public func asMaybe() -> Maybe<Element> {
return PrimitiveSequence(raw: AsMaybe(source: self.asObservable()))
}
final class AsMaybe<Element>: Producer<Element> {
fileprivate let _source: Observable<Element>
init(source: Observable<Element>) {
self._source = source
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AsMaybeSink(observer: observer, cancel: cancel)
let subscription = self._source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
fileprivate final class AsMaybeSink<Observer: ObserverType> : Sink<Observer>, ObserverType {
typealias Element = Observer.Element
private var _element: Event<Element>?
func on(_ event: Event<Element>) {
switch event {
case .next:
if self._element != nil {
// 多个元素会报错
self.forwardOn(.error(RxError.moreThanOneElement))
self.dispose()
}
self._element = event
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
if let element = self._element {
self.forwardOn(element)
}
self.forwardOn(.completed)
self.dispose()
}
}
}
可以看出来asMaybe()
和asSingle()
的处理很类似。asMaybe
只是多加入了完成事件的处理。
Maybe
可以看做是Single
和Completable
的结合体。需要注意的是,Maybe
序列如果存在多个元素也是会抛出异常的。
总结
-
Drive
:订阅操作都是在主线程的,并且会共享资源操作,一般用于对UI
的绑定操作。 -
Single
:要么发出单个元素并且完成,要么是错误事件。 -
Completable
:没有元素的序列,要么是完成事件、要么是错误事件。 -
Maybe
:要么发出单个元素并且完成,要么是错误事件,要么是完成事件。