05RxSwift中的driver老司机

2019-08-04  本文已影响0人  越来越胖了

driver主要解决:

1.网络请求数据被多次使用时,多出订阅会进行多出网络请求。
2.得到错误的data时的错误处理。
3.订阅到了数据后还在子线程没法处理UI

网络请求多次,一般直接share共享状态解决;错误可以添加catchErrorJustReturn处理;线程问题可以通过observeOn(MainScheduler()))在主线程发送。而driver具体怎么实现的,看源码分析。

1.状态共享解决多出请求

public func asDriver() -> Driver<Element> {
        return self.asDriver { _ -> Driver<Element> in
            #if DEBUG
                rxFatalError("Somehow driver received error from a source that shouldn't fail.")
            #else
                return Driver.empty()
            #endif
        }
    }

返回值是一个Driver<Element>如下:

public typealias Driver<Element> = SharedSequence<DriverSharingStrategy, Element>

public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
    public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        return source.share(replay: 1, scope: .whileConnected)
    }
}
----------------SharedSequence----------------
public struct SharedSequence<SharingStrategy: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType {
    let _source: Observable<Element>

    init(_ source: Observable<Element>) {
        self._source = SharingStrategy.share(source)
    }

SharingStrategy.share(source),是一个关联类型,调用了share方法,也就是DriverSharingStrategyshare方法,return source.share(replay: 1, scope: .whileConnected),状态共享了,解决了一个问题。

2.主线程处理UI

self.asDriver---->进去

 public func asDriver(onErrorRecover: @escaping (_ error: Swift.Error) -> Driver<Element>) -> Driver<Element> {
        let source = self
            .asObservable()
            .observeOn(DriverSharingStrategy.scheduler)
            .catchError { error in
                onErrorRecover(error).asObservable()
            }
        return Driver(source)
    }

.observeOn(DriverSharingStrategy.scheduler).catchError { error in onErrorRecover(error).asObservable() }就是研究对象了,DriverSharingStrategy.scheduler实现:

public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
    public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        return source.share(replay: 1, scope: .whileConnected)
    }
}

-----------SharingScheduler.make() 实现--------------

public enum SharingScheduler {
    /// Default scheduler used in SharedSequence based traits.
    public private(set) static var make: () -> SchedulerType = { MainScheduler() }
    ...
}

public final class MainScheduler : SerialDispatchQueueScheduler {

    …

    /// Initializes new instance of `MainScheduler`.
    public init() {
        self._mainQueue = DispatchQueue.main
        super.init(serialQueue: self._mainQueue)
    }
…

 }

self._mainQueue = DispatchQueue.main,相当于最后调用了.observeOn(MainScheduler())),实现了在主线程调用

3.error错误处理

onErrorRecover(error).asObservable()onErrorRecover(error)就是我们外部创建driver序列时,自己定义的.asDriver(onErrorJustReturn: "检测到了错误事件”),这个闭包是通过.catchError(闭包)初始化调用的:

——————catchError,初始化在self._handler保存了闭包,最后会被调用————————————

 public func catchError(_ handler: @escaping (Swift.Error) throws -> Observable<Element>)
        -> Observable<Element> {
        return Catch(source: self.asObservable(), handler: handler)
    }

final private class Catch<Element>: Producer<Element> {
    typealias Handler = (Swift.Error) throws -> Observable<Element>
    
    fileprivate let _source: Observable<Element>
    fileprivate let _handler: Handler
    
    init(source: Observable<Element>, handler: @escaping Handler) {
        self._source = source
        self._handler = handler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = CatchSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

catchError,初始化在self._handler保存了闭包,最后会被调用,这个很重要

———接着👆的代码继承Producer,走run方法,创建了一个CatchSink,--------
 func run() -> Disposable {
        let d1 = SingleAssignmentDisposable()
        self._subscription.disposable = d1
        d1.setDisposable(self._parent._source.subscribe(self))

        return self._subscription
    }


——然后调用self._parent._source.subscribe(self)-------

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let element = self._optional {
            observer.on(.next(element))
        }
        observer.on(.completed)
        return Disposables.create()
    }

-------- observer.on,其实就是sink中的on方法-------------

final private class CatchSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
…
func on(_ event: Event<Element>) {
        switch event {
        case .next:
            self.forwardOn(event)
        case .completed:
            self.forwardOn(event)
            self.dispose()
        case .error(let error):
            do {
                let catchSequence = try self._parent._handler(error)

                let observer = CatchSinkProxy(parent: self)
                
                self._subscription.disposable = catchSequence.subscribe(observer)
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
    }
…
}

case .error(let error)调用_handler(error),也就是前面说的初始化保存的handler闭包,完~~~。

上一篇下一篇

猜你喜欢

热点阅读