RAC和RxSwift

RxSwift中的publish和connect函数

2019-08-14  本文已影响0人  简_爱SimpleLove

publish一般都和connect连用。一般用于网络请求序列,有多次订阅的时候,只是进行一次网络请求。
如下:

        let netOB = Observable<Any>.create { (observer) -> Disposable in
            sleep(2)// 模拟网络延迟
            print("我开始请求网络了")
            observer.onNext("请求到的网络数据")
            observer.onNext("请求到的本地")
            observer.onCompleted()
            return Disposables.create {
                print("销毁回调了")
                }
            }
            .publish()
        
        // 走的协议ObservableType的subscribe方法
        netOB.subscribe(onNext: { (anything) in
            print("订阅1:",anything)
        })
        .disposed(by: disposeBag)
        
        netOB.subscribe(onNext: { (anything) in
            print("订阅2:",anything)
        })
        .disposed(by: disposeBag)
        _ = netOB.connect()

打印结果:

我开始请求网络了
订阅1: 请求到的网络数据
订阅2: 请求到的网络数据
订阅1: 请求到的本地
订阅2: 请求到的本地
销毁回调了

如果注释掉publishconnect,打印结果如下:

我开始请求网络了
订阅1: 请求到的网络数据
订阅1: 请求到的本地
销毁回调了
我开始请求网络了
订阅2: 请求到的网络数据
订阅2: 请求到的本地
销毁回调了

分析

从上面打印可以看出来:

我们继续还是通过代码的流程来进行分析:

1、肯定首先来到源序列的创建create方法中,初始化一个可观察序列AnonymousObservable,并保存闭包。

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

2、来到publish方法中

    /**
    - returns: A connectable observable sequence that shares a single subscription to the underlying sequence.
    */
 public func publish() -> ConnectableObservable<Element> {
        return self.multicast { PublishSubject() }
    }

传了一个参数PublishSubject()过去,并返回了一个multicast可连接可观察序列(共享对基础序列的单个订阅。分析后面知道,它的意思是对源序列订阅一次,其响应结果可被传递到后面的多个订阅)

3、来到multicast的实现

    public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
        -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
        return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
    }

传进去两个参数:source即源序列,makeSubjectPublishSubject()
返回一个ConnectableObservableAdapter序列

4、来到ConnectableObservableAdapter的初始化方法

final private class ConnectableObservableAdapter<Subject: SubjectType>
    : ConnectableObservable<Subject.Element> {
    init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
        self._source = source             // 保存源序列,即调用publish方法的那个序列
        self._makeSubject = makeSubject   // 保存了初始化的时候,传进来的PublishSubject()
        self._subject = nil               // 用于保存subject,初始化为nil
        self._connection = nil            // 用于保存connection,初始化为nil
    }
}

保存了传进来的两个参数sourcemakeSubject。注意ConnectableObservableAdapter作为中间层,并没有直接或者间接继承自Producer

5、初始化完成过后,来的最外层的订阅:订阅1

        // 走的协议ObservableType的subscribe方法
        netOB.subscribe(onNext: { (anything) in
            print("订阅1:",anything)
        })
        .disposed(by: disposeBag)

这里的subscribe并不像以前一样走到Producersubscribe方法,然后走到sinkrun方法,然后再走到源序列初始化的闭包_subscribeHandler里面,走到发送事件流程。

6、来到ConnectableObservableAdapter自己实现的subscribe方法

    fileprivate var lazySubject: Subject {
        if let subject = self._subject {  // 以后永远都是同一个subject,即lazySubject
            return subject
        }

        let subject = self._makeSubject()  // 第一次进来,进行赋值
        self._subject = subject
        return subject
    }

    // 没有间接继承Producer, 就只有自己实现subscribe
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
        // 每次都是lazySubject订阅的观察者,即外面有多少subscribe就会来这里多少次
        // 这里的观察者,是保存外面订阅处理事件闭包eventHandler的AnonymousObserver
        return self.lazySubject.subscribe(observer)
    }

7、来到PublishSubject()subscribe方法

public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

加了锁,保证了订阅观察者的顺序,来看_synchronized_subscribe方法里面的核心代码:

        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)

8、接下来来到最外层的第二个订阅:订阅2

        netOB.subscribe(onNext: { (anything) in
            print("订阅2:",anything)
        })
        .disposed(by: disposeBag)

订阅1一样,最后也只是保存订阅2中创建的AnonymousObserver.on保存在PublishSubject()的字典_observers中,还并没有走到事件发送。

这时我们会好奇,那到底什么时候发生事件呢?这时就来到下面这重要的一步

9、来到最外层的connect方法

        _ = netOB.connect()

ConnectableObservableAdapterconnect方法:

override func connect() -> Disposable {
        return self._lock.calculateLocked {
            // 避免外界多次调用connect方法的时候,返回不同的connection
            if let connection = self._connection {
                return connection
            }

            let singleAssignmentDisposable = SingleAssignmentDisposable()
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
            self._connection = connection
            // connection是观察者,始终都是同一个观察者
            // 源序列订阅connection,最后走到connection.on方法
            let subscription = self._source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection
        }
    }
    init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
        self._parent = parent                // ConnectableObservableAdapter
        self._subscription = subscription
        self._lock = lock
        self._subjectObserver = subjectObserver  // self.lazySubject.asObserver() 即 PublishSubject()
    }

因为源序列AnonymousObservable继承自Producer,所以后面的流程就是:
Producersubscribe方法 -->Producerrun方法--> AnonymousObservablerun方法-->AnonymousObservableSinkrun方法-->AnonymousObservable_subscribeHandler-->发送事件onNext方法-->观察者observeron方法-->connectionon方法

10、connectionon方法

    func on(_ event: Event<Subject.Observer.Element>) {
        if isFlagSet(self._disposed, 1) {
            return
        }
        if event.isStopEvent {
            self.dispose()
        }
        self._subjectObserver.on(event)
    }

上面知道_subjectObserver就是PublishSubject,所以来到PublishSubjecton方法。

11、PublishSubjecton方法中关键代码

    func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            return self._observers   // 返回前面保存的所有订阅的事件处理
    }

return self._observers这句非常关键,意思是返回之前PublishSubject保存的_observers, 即AnonymousObserver.on方法,也就是说依次响应前面的订阅处理事件(订阅1订阅2中的打印)。

订阅1: 请求到的网络数据
订阅2: 请求到的网络数据

12、同理当第二个onNext发送事件的时候,继续走第10和第11步,依次响应前面保存的AnonymousObserver.on方法。

总结:

上一篇下一篇

猜你喜欢

热点阅读