RxSwift源码解析

Rx 从 SinkDisposer 一窥线程安全

2018-09-26  本文已影响17人  狼性刀锋
// SinkDisposer
fileprivate final class SinkDisposer: Cancelable {
    fileprivate enum DisposeState: UInt32 {
        case disposed = 1
        case sinkAndSubscriptionSet = 2
    }

    // Jeej, swift API consistency rules
    fileprivate enum DisposeStateInt32: Int32 {
        case disposed = 1
        case sinkAndSubscriptionSet = 2
    }
    
    private var _state: AtomicInt = 0
    private var _sink: Disposable? = nil
    private var _subscription: Disposable? = nil

    var isDisposed: Bool {
        return AtomicFlagSet(DisposeState.disposed.rawValue, &_state)
    }

    func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
        _sink = sink
        _subscription = subscription

        let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
        if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
            rxFatalError("Sink and subscription were already set")
        }

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
            sink.dispose()
            subscription.dispose()
            _sink = nil
            _subscription = nil
        }
    }
    
    func dispose() {
        let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
            return
        }

        if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
            guard let sink = _sink else {
                rxFatalError("Sink not set")
            }
            guard let subscription = _subscription else {
                rxFatalError("Subscription not set")
            }

            sink.dispose()
            subscription.dispose()

            _sink = nil
            _subscription = nil
        }
    }
}



首先我们要分析下需求,哪些操作是需要保证线程安全的。 显然_state的设置是要保证线程安全的, 那么与之相关的读写操作都是要加锁的,因此 isDisposed,setSinkAndSubscription,dispose都是需要加锁的。RX 通过Atomic Operation 保证操作的原子性。这里值的注意的是通过or设置具体标志位,通过&操作检测具体相应标志位。

setSinkAndSubscription 为例子:

      let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
        if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
            rxFatalError("Sink and subscription were already set")
        }

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
            sink.dispose()
            subscription.dispose()
            _sink = nil
            _subscription = nil
        }


现假设执行setSinkAndSubscription_state = 0bxy

那么执行 let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)

previousState = 0bxy

_state = 0bxy | 0b10 = 0b1y

也就是说这个操作最终导致_state 的第二位置1
previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue

0bxy & 0b10 = 0bx0

x = 1 则最终结果为0b10,否则则为0b00, 再通过if语句即可检测第二位是否为0.

这里还有一个小坑:

        if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
             print("triger  disposed in setSinkAndSubscription function \(self) \n \(Thread.current)")
            sink.dispose()
            subscription.dispose()
            _sink = nil
            _subscription = nil
        }

在检测完setSinkAndSubscription flag之后,立马又检测 disposed flag 如果为真则立即执行dispose操作,也就是说一旦set disposed flag,则再设置setSinkAndSubscription 则是无效操作,这里我不确定具体是什么情况会触发这个操作,不过我跑了下单元测试,确实某些case是会触发这种情况,以下便是一个触发该case的单元测试:

    func test1323() {
        func performSharingOperatorsTest(share: @escaping (Observable<Int>) -> Observable<Int>) {
            _ = share(Observable<Int>.create({ observer in
                    observer.on(.next(1))
                    Thread.sleep(forTimeInterval: 0.1)
                    observer.on(.completed)
                    return Disposables.create()
                })
                .flatMap { (int) -> Observable<Int> in
                    return Observable.create { (observer) -> Disposable in
                        DispatchQueue.global().async {
                            observer.onNext(int)
                            observer.onCompleted()
                        }
                        return Disposables.create()
                    }
                })
                .subscribe { (e) in
                }
        }

        for op in [
            { $0.share(replay: 0, scope: .whileConnected) },
            { $0.share(replay: 0, scope: .forever) },
            { $0.share(replay: 1, scope: .whileConnected) },
            { $0.share(replay: 1, scope: .forever) },
            { $0.share(replay: 2, scope: .whileConnected) },
            { $0.share(replay: 2, scope: .forever) },
            ] as [(Observable<Int>) -> Observable<Int>] {
            performSharingOperatorsTest(share: op)
        }
    }

但是具体怎么原理还有待细究。

相关资料:

property属性的atomic和nonatomic区别

理解Memory Barrier(内存屏障)

上一篇下一篇

猜你喜欢

热点阅读