RxSwiftRxSwift

07. RxSwift源码解读:Single、Completab

2021-06-11  本文已影响0人  Oceanj

今天分享一下Observable的几个变种的用法,解读其源码。这几个都是可观察序列,适用于不同的场景。

Single

Single只能发出一个成功和一个失败两种信号,分别是success()和error, 是对error complete onNext信号的变换,我们看看Single的create方法的代码:

    public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<Element> {
        let source = Observable<Element>.create { observer in
            return subscribe { event in
                switch event {
                case .success(let element):
                    observer.on(.next(element))
                    observer.on(.completed)
                case .failure(let error):
                    observer.on(.error(error))
                }
            }
        }
        return PrimitiveSequence(raw: source)
    }

Single实际是对Observable的封装,内部包含Observable,而且Single只是一个别名,真实类型是PrimitiveSequence。

public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>
public typealias SingleEvent<Element> = Result<Element, Swift.Error>

Result 是Swift的自带枚举类型,包含success和failure两种case。
observer的 .next和.comleted 对应的就是.success, error对应failure,所以这个Single只能发出一个信号,特别适用于网路请求。

我们可以看看Single的真实类型PrimitiveSequence的源码:

public struct PrimitiveSequence<Trait, Element> {
    let source: Observable<Element>

    init(raw: Observable<Element>) {
        self.source = raw
    }
}

包含一个原始的序列source,它的asObservable返回的就是这个source,这样可切换到原始序列进行操作。PrimitiveSequence实际上并没有遵循ObservableType,它遵循的是ObservableConvertibleType。

Maybe和Completable

和Single类似,Maybe和Completable的真实类型也是PrimitiveSequence,区别在于:

Driver和Signal

Driver和Signal也是个特征序列,它们主要是为了简化UI层的代码。
不过如果你遇到的序列具有以下特征,你也可以使用它:

这些都是驱动 UI 的序列所具有的特征。

共享附加作用的意思是,观察者共享源 Observable,并且缓存最新的 n 个元素,将这些元素直接发送给新的观察者;它使用share操作符实现此功能。

我们看看Driver的源码:

public typealias Driver<Element> = SharedSequence<DriverSharingStrategy, Element>

public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { SharingScheduler.make() }
    public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        source.share(replay: 1, scope: .whileConnected)
    }
}

extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingStrategy {
    /// Adds `asDriver` to `SharingSequence` with `DriverSharingStrategy`.
    public func asDriver() -> Driver<Element> {
        self.asSharedSequence()
    }
}

我们看到Driver也是个别名,真实类型是SharedSequence,DriverSharingStrategy类似一个策略表示Driver共享序列的策略是什么,这里看到Driver共享序列的策略是

source.share(replay: 1, scope: .whileConnected)

回放个数为1,scop:.whileConnected。

关于share操作符已经在之前的文章中详细讲解过,这里不再重复

当我们将序列转换为Driver时一般使用.asDriver(onErrorJustReturn: 1), 我们看看源码:

public func asDriver(onErrorJustReturn: Element) -> Driver<Element> {
        let source = self
            .asObservable()
            .observe(on:DriverSharingStrategy.scheduler)
            .catchAndReturn(onErrorJustReturn)
        return Driver(source)
    }

这里选择DriverSharingStrategy.scheduler这个调度器执行通知,scheduler方法的实现是:

{ SharingScheduler.make() }

make的实现是:

MainScheduler()

在主线程调度执行!这样保证UI事件通知是在主线程执行。接着执行.catchAndReturn(onErrorJustReturn)操作符:

public func catchAndReturn(_ element: Element)
        -> Observable<Element> {
        Catch(source: self.asObservable(), handler: { _ in Observable.just(element) })
    }

返回一个Catch类型实例,Catch也是个Observable,它继承自Producer,保存了原始source和handler,实现了run方法:

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = CatchSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }

这里会执行CatchSink的run方法:

    func run() -> Disposable {
        let d1 = SingleAssignmentDisposable()
        self.subscription.disposable = d1
        d1.setDisposable(self.parent.source.subscribe(self))

        return self.subscription
    }

这里关键代码是:self.parent.source.subscribe(self), 调用元素序列的subscribe同时传入当前对象,而当前对象即CatchSink实现了on方法:

    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            self.forwardOn(event)
        case .completed:
            self.forwardOn(event)
            self.dispose()
        case .error(let error):
            do {
                let catchSequence = try self.parent.handler(error)

                let observer = CatchSinkProxy(parent: self)
                
                self.subscription.disposable = catchSequence.subscribe(observer)
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
    }

在之前的文章中已经反复说过,CatchSink作为observer传入subscribe,则执行subscribe时,如果发送事件时会调用该类(CatchSink)的on方法,分析下on方法实现:
当事件类型是.next和.completed时,原样转发此事件,而当事件类型是error时,则执行handler,并执行handler返回值的subscribe方法,我们知道handler 方法返回一个Observable.just(element),所以当发送error时,将error事件转换成正常onNext的事件,发送element。发送element的工作交给CatchSinkProxy代理来完成,代理方法on中的实现如下:

func on(_ event: Event<Element>) {
        self.parent.forwardOn(event)
        
        switch event {
        case .next:
            break
        case .error, .completed:
            self.parent.dispose()
        }
    }

先执行被代理对象的forwardOn,接着如果是next则不做任何事情,因为已经执行了被代理对象的forwardOn,如果遇到error则执行被代理对象的dispose(). 释放相关资源。说白了catchAndReturn的功能是处理错误事件,将其转成一个正常的onNext事件。

回到asDriver代码实现地方,上面已经讲解完了catchAndReturn,接着调用Driver(source):

    init(_ source: Observable<Element>) {
        self.source = SharingStrategy.share(source)
    }

这里调用了DriverSharingStrategy的share操作符。DriverSharingStrategy是作为范型被确定的。DriverSharingStrategy的代码上面已经给出:source.share(replay: 1, scope: .whileConnected), 缓存是1个。所以Driver的功能就是组合observe,cacheError,share:保证在主线程监听,不会产生错误事件,共享附加作用。

Driver序列一般使用drive进行订阅。它的实现在SharedSequenceConvertibleType的扩展中,而且限定了SharingStrategy == DriverSharingStrategy,所以drive只能被Driver调用,我们看看drive实现:

Driver的真实类型是SharedSequence,而SharedSequence遵循了
SharedSequenceConvertibleType,所以它能调用drive方法。

public func drive<Observer: ObserverType>(_ observers: Observer...) -> Disposable where Observer.Element == Element? {
        MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
        return self.asSharedSequence()
                   .asObservable()
                   .map { $0 as Element? }
                   .subscribe { e in
                    observers.forEach { $0.on(e) }
                   }
    }

这里先确保在主线程执行,这里是对订阅方法的包装,使用了以下操作符:

Signal

Signal和Driver非常类似,唯一的区别是,Driver对新观察者回放(重新发送)上一个元素,而 Signal 不会对新观察者回放上一个元素。
从代码中就能看出区别:我们找到SignalSharingStrategy结构体,它的share方法实现是:

public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        source.share(scope: .whileConnected)
    }

SignalSharingStrategy与DriverSharingStrategy类似,从上面代码可以看出Signal的共享策略是没有缓存(replay==0),这是唯一区别。
所以一般情况下状态序列我们会选用 Driver 这个类型,事件序列我们会选用 Signal 这个类型。
比如UITextField的text改变事件序列就是状态序列,它会发送text值。UIButton点击事件序列是事件序列,它没有状态,产生事件但不发送元素。

Signal一般使用emit进行订阅,作用和代码与drive基本一样,过~~~

上一篇下一篇

猜你喜欢

热点阅读