Rx

RxSwift源码分析(11)——publish

2020-10-13  本文已影响0人  无悔zero

今天分析一下publish这个高阶函数,这个函数使用起来有点像网络请求,需要启动连接才会进行相关订阅、发送和响应。可以直接来看下面的例子:

let ob = Observable<Any>.create { (observer) -> Disposable in
    observer.onNext("连接后才订阅、发送和响应")
    observer.onCompleted()
    return Disposables.create()
}.publish()  //注意
ob.subscribe { (text) in
    print(text)
}.disposed(by: disposeBag)
_ = ob.connect()  //连接
  1. 我们先来看publish()源码:
extension ObservableType {
    public func publish() -> ConnectableObservable<Element> {
        return self.multicast { PublishSubject() }
    }
}
public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {
    ...
}

publish函数其实就是创建了ConnectableObservableAdapter序列返回:

extension ObservableType {
    ...
    public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
        -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
        return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
    }
}

ConnectableObservableAdapter其实也继承了ObservableObservableType,所以也是一个序列:

final private class ConnectableObservableAdapter<Subject: SubjectType>
    : ConnectableObservable<Subject.Element> {
    ...
    init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
        self._source = source  //保存源序列
        self._makeSubject = makeSubject  //保存PublishSubject
        self._subject = nil
        self._connection = nil
    }
    ...
}
public class ConnectableObservable<Element>
    : Observable<Element>
    , ConnectableObservableType {
    ...
}
public class Observable<Element> : ObservableType {
    ...
}
  1. 然后序列开始订阅:
ob.subscribe { (text) in
    ...
}

根据RxSwift核心逻辑,来到ConnectableObservableAdaptersubscribe函数,可以看到有一个lazySubjectlazySubject只会创建一次,有利于节省内存和减少性能消耗:

final private class ConnectableObservableAdapter<Subject: SubjectType>
    : ConnectableObservable<Subject.Element> {
    ...
    //只创建一次
    fileprivate var lazySubject: Subject {
        if let subject = self._subject {
            return subject
        }

        let subject = self._makeSubject()
        self._subject = subject
        return subject
    }

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
        return self.lazySubject.subscribe(observer)
    }
}
  1. lazySubject订阅后来到:
public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {
    public typealias SubjectObserverType = PublishSubject<Element>

    typealias Observers = AnyObserver<Element>.s
    ...
    private var _observers = Observers()  //默认初始化
    ...
    public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        ...
        //重点
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }
    ...
}

_synchronized_subscribe函数的代码很容易迷惑人,其实它的重点在self._observers.insert(observer.on)

  1. self._observers是默认初始化的,具有保存的作用,它把observer.on保存下来了:
struct Bag<T> : CustomDebugStringConvertible {
    ...
        mutating func insert(_ element: T) -> BagKey {
        let key = _nextKey

        _nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)

        if _key0 == nil {
            _key0 = key
            _value0 = element
            return key
        }

        _onlyFastPath = false

        if _dictionary != nil {
            _dictionary![key] = element
            return key
        }

        if _pairs.count < arrayDictionaryMaxSize {
            _pairs.append((key: key, value: element)) //保存了observer.on
            return key
        }
        
        _dictionary = [key: element]
        
        return key
    }
    ...
}
  1. 保存之后,下一步就是在适当的时候调用connect
ob.connect()

进入源码,我们可以看到先创建connection,然后用self._source源序列来进行subscribe订阅connection

final private class ConnectableObservableAdapter<Subject: SubjectType>
    : ConnectableObservable<Subject.Element> {
    ...
    override func connect() -> Disposable {
        return self._lock.calculateLocked {
            if let connection = self._connection {
                return connection
            }

            let singleAssignmentDisposable = SingleAssignmentDisposable()
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
            self._connection = connection
            let subscription = self._source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection
        }
    }
    ...
}
final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
    ...
    init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
        self._parent = parent
        self._subscription = subscription
        self._lock = lock
        self._subjectObserver = subjectObserver
    }
    ...
}
  1. 根据RxSwift核心逻辑,最终来到connectionon函数:
final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
    ...
    func on(_ event: Event<Subject.Observer.Element>) {
        if isFlagSet(self._disposed, 1) {
            return
        }
        if event.isStopEvent {
            self.dispose()
        }
        self._subjectObserver.on(event)
    }
    ...
}
  1. 然后调用self._subjectObserver.on(event)=>
    Connection._subjectObserver.on=>
    ConnectableObservableAdapter.lazySubject.on=>
    ConnectableObservableAdapter._makeSubject.on=>
    PublishSubject.on
public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType {
    ...
    public func on(_ event: Event<Element>) {
        ...
        dispatch(self._synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:  
            ...
            return Observers()
        }
    }
    ...
}

然后通过_synchronized_on函数拿到self._observers后执行dispatch(self._synchronized_on(event), event)

  1. 最后遍历调用保存的observer.on
@inline(__always)
func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
    bag._value0?(event)

    if bag._onlyFastPath {
        return
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = bag._dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}
  1. 根据RxSwift核心逻辑observer.on就是AnonymousObserver.on,最终来到外面的响应闭包:
ob.subscribe { (text) in
    print(text)
}

整个流程简单来说就是:
1.创建序列,用publish包装一下;
2.然后通过subscribe函数保存observer.on
3.最后调用connect来进行源序列的订阅,以此唤起发送和响应。

上一篇 下一篇

猜你喜欢

热点阅读