RxSwift

关于Rxswift demo中的ActivityIndicato

2022-03-27  本文已影响0人  令狐冲_

在Rxswift官方demo,class GithubSignupViewModel1中有一处发送网络请求并追踪请求状态的代码如下

        let signingIn = ActivityIndicator()
        self.signingIn = signingIn.asObservable()

        let usernameAndPassword = Observable.combineLatest(input.username, input.password) { (username: $0, password: $1) }

        signedIn = input.loginTaps.withLatestFrom(usernameAndPassword)
            .flatMapLatest { pair in
                return API.signup(pair.username, password: pair.password)
                    .observe(on:MainScheduler.instance)
                    .catchAndReturn(false)
                    // 1. 调用追踪,追踪API.signup,并把追踪信号放到signingIn
                    .trackActivity(signingIn)
            }
            .flatMapLatest { loggedIn -> Observable<Bool> in
                let message = loggedIn ? "Mock: Signed in to GitHub." : "Mock: Sign in to GitHub failed"
                return wireframe.promptFor(message, cancelAction: "OK", actions: [])
                    // propagate original value
                    .map { _ in
                        loggedIn
                    }
            }
            .share(replay: 1)

ActivityIndicator源码如下

import RxSwift
import RxCocoa

private struct ActivityToken<E> : ObservableConvertibleType, Disposable {
    private let _source: Observable<E>
    private let _dispose: Cancelable

    init(source: Observable<E>, disposeAction: @escaping () -> Void) {
       // 4. 设置source,设置Disposables,在资源回收的时候调用disposeAction
      // 也即上一步传入的self.decrement
        _source = source
        _dispose = Disposables.create(with: disposeAction)
    }

    func dispose() {
        _dispose.dispose()
    }

    func asObservable() -> Observable<E> {
        // 12.ActivityToken<Bool>返回的也即是传入的source本身,也即 
       //  Observable<Bool>,也即外面传入的API.Signup
        _source
    }
}

/**
Enables monitoring of sequence computation.

If there is at least one sequence computation in progress, `true` will be sent.
When all activities complete `false` will be sent.
*/
public class ActivityIndicator : SharedSequenceConvertibleType {
    public typealias Element = Bool
    public typealias SharingStrategy = DriverSharingStrategy

    private let _lock = NSRecursiveLock()
    private let _relay = BehaviorRelay(value: 0)
    private let _loading: SharedSequence<SharingStrategy, Bool>

    public init() {
      // 7.relay.value+1 经过map转换为Bool类型,使_loading的值变为true
        _loading = _relay.asDriver()
            .map { $0 > 0 }
            .distinctUntilChanged()
    }

    fileprivate func trackActivityOfObservable<Source: ObservableConvertibleType>(_ source: Source) -> Observable<Source.Element> {
        // 3. 调用using operator,创建一个ActivityToken Source,并把API.Signup
和 self.decrement传入作为初始化参数
        return Observable.using({ () -> ActivityToken<Source.Element> in
            // 5. 构建Token Source的工厂函数,内部调用self.increment()
            self.increment()
            return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
        }) { t in
            // 10. 通过5创建的Token Source创建一个新的observerable
            //  这里t的类型是ActivityToken<Bool>
            return t.asObservable()
        }
    }

    private func increment() {
      // 6. increment让_relay接收元素_relay.value+1
        _lock.lock()
        _relay.accept(_relay.value + 1)
        _lock.unlock()
    }

    private func decrement() {
        _lock.lock()
        _relay.accept(_relay.value - 1)
        _lock.unlock()
    }

    public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
      // 9.外界subscribe ActivityIndicator,实际是在subscribe _loading,也就是外        
     //   面的signingIn
        _loading
    }
}

extension ObservableConvertibleType {
    public func trackActivity(_ activityIndicator: ActivityIndicator) -> Observable<Element> {
        // 2. indicator追踪传入的信号API.signup
        activityIndicator.trackActivityOfObservable(self)
      // 13. return的是ActivityIndicator调用trackActivityOfObservable函数返回的
      // Observable<Bool>类型,也即通过Observable.using创建出来的
    }
}

USing

extension ObservableType {
    /**
     Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime.

     - seealso: [using operator on reactivex.io](http://reactivex.io/documentation/operators/using.html)

     - parameter resourceFactory: Factory function to obtain a resource object.
     - parameter observableFactory: Factory function to obtain an observable sequence that depends on the obtained resource.
     - returns: An observable sequence whose lifetime controls the lifetime of the dependent resource object.
     */
    public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, observableFactory: @escaping (Resource) throws -> Observable<Element>) -> Observable<Element> {
        Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
    }
}

using操作符按照Reactivex的解释如下

image.png
它首先通过第一个λ参数resourceFactory创建了一个resource,然后又把resource传给第二个λ参数observableFactory创建了一个Observable<Element>,最后再调用Using构造函数构造了一个新的Observable<Element>,这个Observable牢牢控制着resource的生命期,
一旦被返回的Observable发出complete或者error事件,resource的生命期也会终止,换言之,resource会调用自己的dispose()事件。
Using继承自Producer<SourceType>

class Producer<Element>: Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        rxAbstractMethod()
    }
}

Producer在subscribe的时候,会根据当先调度器创建disposer,并调用子类的
run函数,回到子类的run函数

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        // 构建sink
        let sink = UsingSink(parent: self, observer: observer, cancel: cancel)
        // 调用sink.run
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }

也即是

final private class UsingSink<ResourceType: Disposable, Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias SourceType = Observer.Element 
    typealias Parent = Using<SourceType, ResourceType>

    private let parent: Parent
    
    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self.parent = parent
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        var disposable = Disposables.create()
        
        do {
            // 1.拿到resourceFactory创建resource,也即ActivityToken<E>
            let resource = try self.parent.resourceFactory()
            disposable = resource
            // 2. 传入token,拿到新的source observable,也即
            //     func asObservable() -> Observable<E> {
            //        //  最开始的API.signup
            //        _source
            //      }
            let source = try self.parent.observableFactory(resource)
            
            return Disposables.create(
                // source的订阅换成了前面返回的Using
                source.subscribe(self),
                disposable
            )
        } catch let error {
            return Disposables.create(
                Observable.error(error).subscribe(self),
                disposable
            )
        }
    }
    
    func on(_ event: Event<SourceType>) {
        // 把sourceObservable发送的元素转发一遍
        switch event {
        case let .next(value):
            self.forwardOn(.next(value))
        case let .error(error):
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

至此,我们明白了,observabel.using返回的Observabel<Bool>和API.signup发射的元素一致。
而API.signup在发出complete事件后(注:事件流见附日志),即代表这个observable序列终止,经由它创建的ActivityToken生命期也随之终止,调用了自身的dispose()函数,也顺带调用前一步传入的self.decrement,也即

    private func decrement() {
        _lock.lock()
        _relay.accept(_relay.value - 1)
        _lock.unlock()
    }

而此时_relay.value变成了0,_loading也变成了false

        _loading = _relay.asDriver()
            .map { $0 > 0 }
            .distinctUntilChanged()

而_loading作为signingIn的asSharedSequence(),也即是自身的observable
详见

import RxSwift

/**
    Trait that represents observable sequence that shares computation resources with following properties:

    - it never fails
    - it delivers events on `SharingStrategy.scheduler`
    - sharing strategy is customizable using `SharingStrategy.share` behavior

    `SharedSequence<Element>` can be considered a builder pattern for observable sequences that share computation resources.

    To find out more about units and how to use them, please visit `Documentation/Traits.md`.
*/
public struct SharedSequence<SharingStrategy: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType, ObservableConvertibleType {
    let source: Observable<Element>

    init(_ source: Observable<Element>) {
        self.source = SharingStrategy.share(source)
    }

    init(raw: Observable<Element>) {
        self.source = raw
    }

    #if EXPANDABLE_SHARED_SEQUENCE
    /**
     This method is extension hook in case this unit needs to extended from outside the library.
     
     By defining `EXPANDABLE_SHARED_SEQUENCE` one agrees that it's up to them to ensure shared sequence
     properties are preserved after extension.
    */
    public static func createUnsafe<Source: ObservableType>(source: Source) -> SharedSequence<SharingStrategy, Source.Element> {
        SharedSequence<SharingStrategy, Source.Element>(raw: source.asObservable())
    }
    #endif

    /**
    - returns: Built observable sequence.
    */
    public func asObservable() -> Observable<Element> {
        self.source
    }

    /**
    - returns: `self`
    */
    public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
        self
    }
}

完成了_loading对当前追踪的计算序列的追踪。
值得肯定的是官方对ActivityIndicator的设计并没有简单的设计为追踪一个流的computation,而是通过map ($0 > 0)这个写法,让它有了追踪多个Observable序列的能力,每追踪多一个则_relay.accept(_relay.value + 1),一个序列终止则_relay.accept(_relay.value - 1),直到减为0,_loading才会由true变为false。
非常类似自动引用计数(Automatic Reference Counting)的思想,值得我们借鉴和反复琢磨。

附:

// 追踪Observabel debug日志
2022-03-27 11:06:07.371: signup -> subscribed
2022-03-27 11:06:08.386: signup -> Event next(true)
2022-03-27 11:06:08.425: signup -> Event completed
2022-03-27 11:06:08.425: signup -> isDisposed

本文参考资料:

operators-using
ActivityIndicator

上一篇下一篇

猜你喜欢

热点阅读