RxSwift-publish源码解析

2019-08-14  本文已影响0人  king_jensen

publish使用

let subject = PublishSubject<Any>()
        subject.subscribe{print("00:\($0)")}
            .disposed(by: disposeBag)
        
        let netOB = Observable<Any>.create { (observer) -> Disposable in
                sleep(2)// 模拟网络延迟
                print("我开始请求网络了")
                observer.onNext("请求到的网络数据")
                observer.onNext("请求到的本地")
                observer.onCompleted()
                return Disposables.create {
                    print("销毁回调了")
                }
            }.publish()
        
        netOB.subscribe(onNext: { (anything) in
                print("订阅1:",anything)
            })
            .disposed(by: disposeBag)

        // 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的额地方
        // 所以在订阅一次 会出现什么问题?
        netOB.subscribe(onNext: { (anything) in
                print("订阅2:",anything)
            })
            .disposed(by: disposeBag)
        
        _ = netOB.connect()
  /*打印结果:
         我开始请求网络了
         订阅1: 请求到的网络数据
         订阅2: 请求到的网络数据
         订阅1: 请求到的本地
         订阅2: 请求到的本地
         销毁回调了
         */

我们看到网络只会请求一次。这种请求一次,订阅到多个不同的地方的场景很多。所以我们有必要了解一下publish是怎么实现的。
探索publish的源码,还是从RxSwift的流程:创建序列,订阅序列,发送响应入手分析。

publish的序列创建

public func publish() -> ConnectableObservable<Element> {
        return self.multicast { PublishSubject() }
    }

我们看到publish实际返回的是multicast

  public func multicast<Subject: SubjectType>(_ subject: Subject)
        -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
        return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: { subject })
    }

返回类型为ConnectableObservableAdapter

init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
        self._source = source
        self._makeSubject = makeSubject
        self._subject = nil
        self._connection = nil
    }

1.保存源序列_source
2.保存初始化序列PublishSubject()_makeSubject

序列的订阅

当序列订阅时,调用ConnectableObservableAdaptersubscribe(observer)

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
        return self.lazySubject.subscribe(observer)
    }

这里订阅lazySubject序列,这是一个懒加载方式,每次订阅的都是序列_subject,即创建中保存的_makeSubject

fileprivate var lazySubject: Subject {
        if let subject = self._subject {
            return subject
        }

        let subject = self._makeSubject()
        self._subject = subject
        return subject
    }

订阅_subject时,将会调用PublishSubject.subscribe(observer)

let subscription = self._synchronized_subscribe(observer)

调用_synchronized_subscribe(observer)

func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }

调用self._observers.insert(observer.on)

mutating func insert(_ element: T) -> BagKey {
        let key = _nextKey

        _nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)

        if _key0 == nil {
            _key0 = key
            _value0 = element
            return key
        }

        _onlyFastPath = false

        if _dictionary != nil {
            _dictionary![key] = element
            return key
        }

        if _pairs.count < arrayDictionaryMaxSize {
            _pairs.append((key: key, value: element))
            return key
        }
        
        _dictionary = [key: element]
        
        return key
    }

publish订阅时,将observer.on的回调方法保存起来,如果是首次订阅将key保存在_key0,observer.on保存在_value0,不是首次保存订阅,那么就保存在_dictionary中。
keyBagKey类型:

struct BagKey {
    fileprivate let rawValue: UInt64
}

publish将所有订阅的观察者回调方法保存起来,以备后续发起响应时,调用所有观察者的回调方法。

发送响应

使用connect发送响应

override func connect() -> Disposable {
        return self._lock.calculateLocked {
            if let connection = self._connection {
                return connection
            }

            let singleAssignmentDisposable = SingleAssignmentDisposable()
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
            self._connection = connection
            let subscription = self._source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection
        }
    }

self._lock.calculateLocked加了递归锁

 final func calculateLocked<T>(_ action: () -> T) -> T {
        self.lock(); defer { self.unlock() }
        return action()
    }

然后执行action(),就是下列代码:

if let connection = self._connection {
                return connection
            }

            let singleAssignmentDisposable = SingleAssignmentDisposable()
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
            self._connection = connection
            let subscription = self._source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection

1.保证只有一个_connection,当_connection存在,直接返回_connection。保证下列的原序列_source永远只被订阅一次,那么外界网络请求的闭包就只会执行一次。
2.当_connectionnil时,会创建

 let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
       self._connection = connection
  let subscription = self._source.subscribe(connection)

(1)创建Connection

 init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
        self._parent = parent
        self._subscription = subscription
        self._lock = lock
        self._subjectObserver = subjectObserver
    }

保存ConnectableObservableAdapter_parent,self.lazySubject.asObserver()保存至_subscription
(2)保存Connection_connection
(3)订阅源序列_source,connection是观察者. 将会来到AnonymousObservable.run,然后再到sink.run,最终会调用到_subscribeHandler执行到外界的闭包中。

在外界闭包中,observer.onNext("请求到的网络数据")发起响应,原source的观察者是Connection,Connection.on

func on(_ event: Event<Subject.Observer.Element>) {
        if isFlagSet(self._disposed, 1) {
            return
        }
        if event.isStopEvent {
            self.dispose()
        }
        self._subjectObserver.on(event)
    }

Connection.on调用self._subjectObserver.on(event)
self._subjectObserver就是publish创建时保存的那个PublishSubject()
self._subjectObserver.on(event)等同于PublishSubject.on(event)

 public func on(_ event: Event<Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        dispatch(self._synchronized_on(event), event)
    }

1.调用_synchronized_on

   func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:
            if self._stoppedEvent == nil {
                self._stoppedEvent = event
                self._stopped = true
                let observers = self._observers
                self._observers.removeAll()
                return observers
            }

            return Observers()
        }
    }

_synchronized_on中返回self._observers,即订阅时保存的所有的observer.on

2.dispatch(self._synchronized_on(event), event)调用dispatch

func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
    bag._value0?(event)

    if bag._onlyFastPath {
        return
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = bag._dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}

bag._value0?(event)首先调用第一个订阅者的回调方法。
然后循环执行_pairs_dictionary中所有保存的回调方法。
以上就是publish所有流程的源码解析。

总结:

1.publish创建,实际返回是ConnectableObservableAdapter序列,保存了源序列,并且创建保存PublishSubject()序列。
2.publish订阅时,订阅的是中间层ConnectableObservableAdapterPublishSubject()序列,并保存所有的回调方法observer.on
3.connect时,实际上是订阅了源序列,观察者为我们自己创建的Connection对象,这个时候将会来到创建源序列时的闭包,我们就是在这个闭包中请求网络.
4.给源序列发送响应时,实际上会来到观察者Connection.on方法,Connection中将会对订阅时保存的_observers一一执行。
以上就是publish序列的所有流程。

上一篇下一篇

猜你喜欢

热点阅读