RxSwift特征序列s
RxSwift特征序列s
Single
Single 是 Observable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能发出一个元素,要么产生一个 error 事件。
demo
伪代码:比如我们封装一个网络请求,请求响应后发出一个成功或者失败的元素序列。
func request(_ urlStr: String) -> Single<[String: Any]> {
return Single<[String: Any]>.create { single in
let url = URL(string: urlStr)!
let task = URLSession.shared.dataTask(with: url) {
data, _, error in
if let error = error {
single(.error(error))
return
}
guard let data = data,
let json = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves),
let result = json as? [String: Any] else {
single(.error(NSError.init(domain:"DataError.cantParseJSON", code: 2, userInfo: nil)))
return
}
single(.success(result))
}
task.resume()
return Disposables.create { task.cancel() }
}
}
request("https://www.baidu.com") //创建
.subscribe(onSuccess: { json in //订阅
print("JSON: ", json)
}, onError: { error in
print("Error: ", error)
})
.disposed(by: disposeBag) //销毁
解析
为什么说它和 Observable 很像呢?简单看一下源码就知道了!
1. Single 的创建
点击Single
后,会发现它是PrimitiveSequence
结构体的别名。
PrimitiveSequence
结构体定义
public struct PrimitiveSequence<Trait, Element> {
let source: Observable<Element>
init(raw: Observable<Element>) {
self.source = raw
}
}
PrimitiveSequence
结构体中只有一个source
属性。
再看一下Single
对PrimitiveSequenceType
协议中create
函数的实现:
extension PrimitiveSequenceType where Trait == SingleTrait {
public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<Element> {
let source = Observable<Element>.create { observer in
return subscribe { event in
switch event {
case .success(let element):
observer.on(.next(element))
observer.on(.completed)
case .error(let error):
observer.on(.error(error))
}
}
}
return PrimitiveSequence(raw: source)
}
}
内部就是创建了一个PrimitiveSequence
简单序列,这个简单序列也只是把Observable
当做源序列包装起来而已。
2. Single 的订阅
继续PrimitiveSequence
的subscribe(onSuccess:,onError:)
订阅:
extension PrimitiveSequenceType where Trait == SingleTrait {
public func subscribe(onSuccess: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil) -> Disposable {
......
return self.primitiveSequence.subscribe { event in
switch event {
case .success(let element):
onSuccess?(element)
case .error(let error):
if let onError = onError {
onError(error)
} else {
Hooks.defaultErrorHandler(callStack, error)
}
}
}
}
}
这里self.primitiveSequence
只是转换下类型而已。
public protocol PrimitiveSequenceType {
associatedtype Trait
associatedtype Element
@available(*, deprecated, message: "Use `Trait` instead.")
typealias TraitType = Trait
@available(*, deprecated, message: "Use `Element` instead.")
typealias ElementType = Element
var primitiveSequence: PrimitiveSequence<Trait, Element> { get }
}
extension PrimitiveSequence: PrimitiveSequenceType {
public var primitiveSequence: PrimitiveSequence<Trait, Element> {
return self
}
}
extension PrimitiveSequence: ObservableConvertibleType {
public func asObservable() -> Observable<Element> {
return self.source
}
}
PrimitiveSequence
的subscribe(observer:)
订阅:
public func subscribe(_ observer: @escaping (SingleEvent<Element>) -> Void) -> Disposable {
var stopped = false
return self.primitiveSequence.asObservable().subscribe { event in
if stopped { return }
stopped = true
switch event {
case .next(let element):
observer(.success(element))
case .error(let error):
observer(.error(error))
case .completed:
rxFatalErrorInDebug("Singles can't emit a completion event")
}
}
}
asObservable()
则是获取它的源序列,还是让源序列去订阅的。
源序列的订阅流程,透视RxSwift核心逻辑中已经很明白了,
- 源序列的订阅会触发源序列创建时的
create闭包
-
Single
调用create
时传入的subscribe闭包
- 开始网络请求
- 请求响应后发出信号
single(.success(result))
-
Observable
的create闭包
里面那个subscribe闭包
的回调。 -
Observable
的observer.on(.next(element))
后,马上.completed
-
Observable
的subscribe
闭包回调 -
PrimitiveSequence
的observer(.success(element))
回调 -
PrimitiveSequence
的onSuccess?(element)
回调 - demo 中
subscribe回调
,打印输出
这里的闭包嵌套太多。。。
时序 | demo | 中间序列 | observable |
---|---|---|---|
1 | 调用request,创建中间序列,传参一个闭包➡️ | ||
2 | 创建observable,并指定为source,在create闭包中处理request函数中的闭包➡️ | ||
3 | create | ||
4 | 订阅,传参subcribe闭包➡️ | ||
5 | 具体函数A:用更抽象的方法去订阅,并在回调中处理demo的subscribe闭包⬇️ | ||
6 | 抽象函数B:让observable去订阅,并在observable订阅闭包中处理上个函数传入的闭包➡️ | ||
7 | 订阅⬇️ | ||
8 | ⬅️回调create闭包 | ||
9 | request函数中的闭包执行:请求网络⬇️ | ||
10 | 服务器响应⬇️ | ||
11 | 回调request中的闭包➡️ | ||
12 | ⬅️响应observable的订阅 | ||
13 | 抽象函数B:处理上个函数传入的闭包⬇️ | ||
14 | ⬅️具体函数A:处理demo的subscribe闭包 | ||
15 | 执行subcribe中的代码 |
总结:
在 demo 中,创建和订阅都只和 single 打交道,用户只需要关心 single 的 api 就行
single 有贤内助帮忙打理一切,真是个甩手掌柜
我们平时封装也是像这样事务下沉的,这个封装逻辑很简单。但是写清楚了就很累,闭包一层一层的,还有就是尾随闭包,看起来都不太明显,而代码中的关键字就那么几个:subcribe,observer,一不留神就搞混了。
Completable
只能产生一个 completed 事件,要么产生一个 error 事件。
demo
func cacheLocally() -> Completable {
return Completable.create { completable in
// Store some data locally
let success = true
guard success else {
completable(.error(NSError.init(domain:"cacheLocally.error", code: 2, userInfo: nil)))
return Disposables.create {}
}
completable(.completed)
return Disposables.create {}
}
}
cacheLocally()
.subscribe(onCompleted: {
print("Completed with no error")
}, onError: { error in
print("Completed with an error: \(error.localizedDescription)")
})
.disposed(by: disposeBag)
关键代码
RxSwift 的套路,一以贯之。
public static func create(subscribe: @escaping (@escaping CompletableObserver) -> Disposable) -> PrimitiveSequence<Trait, Element> {
let source = Observable<Element>.create { observer in
return subscribe { event in
switch event {
case .error(let error):
observer.on(.error(error))
case .completed:
observer.on(.completed)
}
}
}
return PrimitiveSequence(raw: source)
}
Maybe
它介于 Single 和 Completable 之间,它要么只能发出一个元素,要么产生一个 completed 事件,要么产生一个 error 事件。
demo
func generateString() -> Maybe<String> {
return Maybe<String>.create { maybe in
maybe(.success("RxSwift.Maybe"))
// OR
maybe(.completed)
// OR
maybe(.error(NSError.init(domain:"Maybe.error", code: 2, userInfo: nil)))
return Disposables.create {}
}
}
generateString()
.subscribe(onSuccess: { element in
print("Completed with element \(element)")
}, onError: { error in
print("Completed with an error \(error.localizedDescription)")
}, onCompleted: {
print("Completed with no element")
})
.disposed(by: disposeBag)
关键代码
RxSwift 的套路,一以贯之。代码也省了,回调不回调用户可以决定。