Swift

RxSwift特征序列之Driver

2019-08-20  本文已影响0人  silasjs

RxSwift特征序列之Driver

Driver 是个比较特殊的序列,它主要是对需要在 UI 上做出响应的序列进行了封装。这层封装做了三件事情:

  1. 在主线程监听
  2. 不会产生 error 事件
  3. 共享附加作用

没有对比就没有伤害,先看看一搬的序列在驱动 UI 时会怎么做,再回首就能体会到 Driver 的便捷之处了。

demo

准备一个模拟网络请求的函数,然后把输入框的编辑事件和网络请求的结果合并之后,订阅到的结果在 UI(lab 和 btn)上展示出来。

func dealWithData(inputText:String)-> Observable<Any>{
    print("准备网络请求---\(Thread.current)") // data
    return Observable<Any>.create({ (ob) -> Disposable in
        if inputText == "1234" {
            ob.onError(NSError.init(domain: "❌", code: 120, userInfo: nil))
        }
        
        DispatchQueue.global().async {
            print("发送前的线程: \(Thread.current)")
            ob.onNext("\(inputText)")
            ob.onCompleted()
        }
        return Disposables.create()
    })
}

然后开始序列的创建、订阅:

let result  = self.tf.rx.text.skip(1)
    .flatMap { [weak self](input) -> Observable<Any> in
        return (self?.dealWithData(inputText:input ?? ""))!
}

result.map {
        print("map映射lab---\($0)---\(Thread.current)")
        return "长度: \(($0 as! String).count)"
    }
    .bind(to:self.lab.rx.text)
    .disposed(by: disposeBag)

result.map {
        print("map映射btn---\($0)---\(Thread.current)")
        return "\($0 as! String)"
    }
    .bind(to:self.btn.rx.title())
    .disposed(by: disposeBag)

当输入框输入 2 的时候会打印:

准备网络请求---<NSThread: 0x600003798dc0>{number = 1, name = main}
发送前的线程: <NSThread: 0x6000037fd900>{number = 4, name = (null)}
map映射lab---2---<NSThread: 0x6000037fd900>{number = 4, name = (null)}
准备网络请求---<NSThread: 0x600003798dc0>{number = 1, name = main}
发送前的线程: <NSThread: 0x6000037307c0>{number = 6, name = (null)}
map映射btn---2---<NSThread: 0x6000037307c0>{number = 6, name = (null)}

这样写会有些问题:

  1. 输入框每次的编辑事件都会触发两次请求,因为订阅(bind)了两次,并没有共享。
  2. 在子线程请求后,响应也是在子线程。
  3. 如果网络请求序列发出 error 事件,就会取消所有的绑定,无法发出新的请求,并抛出异常错误。

为了避免这三个问题,就得多调用几个方法:

let result  = self.tf.rx.text.skip(1)
    .flatMap { [weak self](input) -> Observable<Any> in
        return (self?.dealWithData(inputText:input ?? ""))!
            //保证了在主线程监听
            .observeOn(MainScheduler())
            //避免程序抛出错误异常
            .catchErrorJustReturn("检测到了错误事件")
    }
    //共享附加作用
    .share(replay: 1, scope: .whileConnected)

result.map {
        print("map映射lab---\($0)---\(Thread.current)")
        return "长度: \(($0 as! String).count)"
    }
    .bind(to:self.lab.rx.text)
    .disposed(by: disposeBag)

result.map {
        print("map映射btn---\($0)---\(Thread.current)")
        return "\($0 as! String)"
    }
    .bind(to:self.btn.rx.title())
    .disposed(by: disposeBag)
    
error 事件的打印:
准备网络请求---<NSThread: 0x600001f6e880>{number = 1, name = main}
map映射lab---检测到了错误事件---<NSThread: 0x600001f6e880>{number = 1, name = main}
map映射btn---检测到了错误事件---<NSThread: 0x600001f6e880>{number = 1, name = main}
发送前的线程: <NSThread: 0x600001f1d100>{number = 4, name = (null)}

使用 Driver

如果使用 Driver 的话,就是这个样子的:

let result = self.tf.rx.text.orEmpty
    .asDriver()
    .flatMap {
        return self.dealWithData(inputText: $0)
            .asDriver(onErrorJustReturn: "检测到了错误事件")
}

result.map {
        return "长度: \(($0 as! String).count)"
    }
    .drive(self.lab.rx.text)
    .disposed(by: disposeBag)

result.map {
        return "\($0 as! String)"
    }
    .drive(self.btn.rx.title())
    .disposed(by: disposeBag)

要使用 Driver,就要先用asDriver把序列转换为 Driver,然后才能拥有 drive 的绑定能力。把输入框编辑事件和网络请求都转为 Driver 序列再合并后,用 drive 把订阅到的数据绑定到 UI 上。同样可以避免那三种情况,写起来更简洁。

解析

先点进去asDriver(由于 demo 中 error 事件是在网络请求中发出的,这里主要看flatMap中的asDriver):

extension ObservableConvertibleType {
    public func asDriver(onErrorJustReturn: Element) -> Driver<Element> {
        let source = self
            .asObservable()
            .observeOn(DriverSharingStrategy.scheduler)
            .catchErrorJustReturn(onErrorJustReturn)
        return Driver(source)
    }
}

到了ObservableConvertibleTypeDriver分类文件中。里面以调用者作为源序列构建了Driver序列,源序列在这里也是做了两个处理:1.observeOn:在主线程监听;2.catchErrorJustReturn:error 事件不会抛出异常。

1.主线程监听

public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
}

结构体DriverSharingStrategyscheduler返回的是SharingScheduler枚举值。默认值就是主线程。

public enum SharingScheduler {
    public private(set) static var make: () -> SchedulerType = { MainScheduler() }
}

2.回避 error 事件

public func catchErrorJustReturn(_ element: Element) -> Observable<Element> {
    return Catch(source: self.asObservable(), handler: { _ in Observable.just(element) })
}

传入 source 和 handler 闭包,构建了一个Catch序列。

final private class Catch<Element>: Producer<Element> {
    typealias Handler = (Swift.Error) throws -> Observable<Element>
    
    init(source: Observable<Element>, handler: @escaping Handler) {
        self._source = source
        self._handler = handler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = CatchSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

老套路了,CatchSinkrun中,self._parent._source.subscribe(self)订阅后的响应在CatchSinkon里面。

final private class CatchSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    func run() -> Disposable {
        let d1 = SingleAssignmentDisposable()
        self._subscription.disposable = d1
        d1.setDisposable(self._parent._source.subscribe(self))

        return self._subscription
    }
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            self.forwardOn(event)
        case .completed:
            self.forwardOn(event)
            self.dispose()
        case .error(let error):
            do {
                let catchSequence = try self._parent._handler(error)

                let observer = CatchSinkProxy(parent: self)
                
                self._subscription.disposable = catchSequence.subscribe(observer)
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
    }
}

next 和 completed 事件都是常规操作,主要是 error 的处理,首先回调了_handler(error),这个_handler就是初始化Catch时的{ _ in Observable.just(element) }。虽然回调时把error传过去了,但是闭包中直接忽略了(error 被无视了),还是把外界传入的element用来构造一个 just 序列作为返回值(just 只发出唯一的元素,就是这里的element了)。所以,这里的catchSequence就是个 just 序列

然后构造了一个中间层CatchSinkProxy,作为 just 序列的观察者,订阅后自然还是在中间层CatchSinkProxyon中响应:

final private class CatchSinkProxy<Observer: ObserverType>: ObserverType {
    func on(_ event: Event<Element>) {
        self._parent.forwardOn(event)
        
        switch event {
        case .next:
            break
        case .error, .completed:
            self._parent.dispose()
        }
    }
}

二话不说,直接让CatchSink通过forwardOn响应外界。回避 error 流程结束。

3. 共享附加作用

好像还少了一点:共享附加作用,继续跟着Driver的构造函数走。

public typealias Driver<Element> = SharedSequence<DriverSharingStrategy, Element>

Driver只是SharedSequence的别名。看到这里也大致能猜到这个SharedSequence就是处理共享附加作用的。

public struct SharedSequence<SharingStrategy: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType {
    let _source: Observable<Element>

    init(_ source: Observable<Element>) {
        self._source = SharingStrategy.share(source)
    }
}

SharedSequence构造函数中的SharingStrategy像是突然蹦出来的,点击也跳不到定义的位置。从SharedSequence的定义中看出它是个遵守SharingStrategyProtocol协议的泛型。之前给Driver起别名的时候,SharedSequence中指定的是DriverSharingStrategy。那我们点击share是就可以选择DriverSharingStrategy.share的位置。

public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
    public static func share<Element>(_ source: Observable<Element>) -> Observable<Element> {
        return source.share(replay: 1, scope: .whileConnected)
    }
}

结构体DriverSharingStrategyshare里,又让source调用了一个share。然后,Driver(也就是SharedSequence)的_source还是调用asDriver的那个序列么?

看到这里,感觉很绕,外面一句代码的调用,里面缺穿行了好多层,还看不到头。但是也会发现,每一层都只做很简单的事,它们之间灵活搭配,不同的组合可以完成各种不同的功能。

继续跳进share

public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected) -> Observable<Element> {
    switch scope {
    case .forever:
        switch replay {
        case 0: return self.multicast(PublishSubject()).refCount()
        default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
        }
    case .whileConnected:
        switch replay {
        case 0: return ShareWhileConnected(source: self.asObservable())
        case 1: return ShareReplay1WhileConnected(source: self.asObservable())
        default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
        }
    }
}

我们只看满足条件(replay == 1,scope == whileConnected)的ShareReplay1WhileConnected(source: self.asObservable())。其他分支跳进去又是一大坨。这里果然是构建了另一个序列ShareReplay1WhileConnected,但source还是那个调用asDriver的源序列。

final private class ShareReplay1WhileConnected<Element>
    : Observable<Element> {
    fileprivate typealias Connection = ShareReplay1WhileConnectedConnection<Element>
    fileprivate var _connection: Connection?

    init(source: Observable<Element>) {
        self._source = source
    }

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock()

        let connection = self._synchronized_subscribe(observer)
        let count = connection._observers.count

        let disposable = connection._synchronized_subscribe(observer)

        self._lock.unlock()
        
        if count == 0 {
            connection.connect()
        }

        return disposable
    }

    @inline(__always)
    private func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Connection where Observer.Element == Element {
        let connection: Connection

        if let existingConnection = self._connection {
            connection = existingConnection
        }
        else {
            connection = ShareReplay1WhileConnectedConnection<Element>(
                parent: self,
                lock: self._lock)
            self._connection = connection
        }

        return connection
    }
}

ShareReplay1WhileConnected的订阅中,调用_synchronized_subscribe,引用了一个ShareReplay1WhileConnectedConnection。然后获取一下观察者总数,就把ShareReplay1WhileConnected作为ShareReplay1WhileConnectedConnection的观察者开始同步订阅了。

fileprivate final class ShareReplay1WhileConnectedConnection<Element>
    : ObserverType
    , SynchronizedUnsubscribeType {
    final func on(_ event: Event<Element>) {
        self._lock.lock()
        let observers = self._synchronized_on(event)
        self._lock.unlock()
        dispatch(observers, event)
    }

    final private func _synchronized_on(_ event: Event<Element>) -> Observers {
        if self._disposed {
            return Observers()
        }

        switch event {
        case .next(let element):
            self._element = element
            return self._observers
        case .error, .completed:
            let observers = self._observers
            self._synchronized_dispose()
            return observers
        }
    }

    final func connect() {
        self._subscription.setDisposable(self._parent._source.subscribe(self))
    }

    final func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock(); defer { self._lock.unlock() }
        if let element = self._element {
            observer.on(.next(element))
        }

        let disposeKey = self._observers.insert(observer.on)

        return SubscriptionDisposable(owner: self, key: disposeKey)
    }
}

ShareReplay1WhileConnectedConnection_synchronized_subscribe中,如果_element有值,观察者就发出 next 事件出去,然后就是observer.on装袋了,很熟悉的模式,跟 RxSwift之Subject 中的 ReplaySubject、PublishSubject里的处理非常类似。

到此为止还没有真正的去订阅,我们回到ShareReplay1WhileConnectedsubscribe函数里,继续下一步。初次进来,袋子里的观察者 count 必定为 0 。也会调用connection.connect()ShareReplay1WhileConnectedConnectionconnect里才走了源序列的订阅subscribe。之后的响应也和PublishSubject中的一样多点发送,只是 Replay 的只有一个元素罢了。这样也完成了共享附加作用。

上一篇 下一篇

猜你喜欢

热点阅读