iOS-项目实战

RxSwift 核心逻辑初探

2019-07-23  本文已影响0人  周小周_

RxSwift

RxSwift: Reactive Extensions Swift,函数响应式编程的基础框架。简化代码,易读性高。

函数式,数学概念y = f(x),x也可以是个函数,那么y = f(f(x))。利用面向函数,可读性更高,分段式更简洁。
响应式,面向数据流和变化传播的编程范式。数据流:只能以事先规定好的顺序被读取一次的数据的一个序列。变化传播:类似观察者模式,变化了要通知别人。

RxSwift初体验

KVO

不需要再去写被观察对象的addObserver:forKeyPath:options:context: 方法注册观察者
观察者也不需要实现observeValueForKeyPath:ofObject:change:context: 回调方法

 func setupKVO() {
        /// self.person.addObserver(self, forKeyPath: "name", options: .new, context: nil)
        self.person.rx.observeWeakly(String.self, "name")
            .subscribe(onNext: { (value) in
                print(value as Any)
            })
            .disposed(by: disposeBag)
    }
UI

button为例,业务逻辑和功能逻辑放在一起,更清晰,方便查找

 func setupButton() {
        ///直接.tap就可以实现touchUpInside事件
        self.button.rx.tap
            .subscribe(onNext: { () in
                print("点击来了")
            })
            .disposed(by: disposeBag)
        ///如果不想用touchUpInside事件,可以这样
        self.button.rx.controlEvent(.touchUpOutside)
    }
代理

可以省略代理方法实现

func setupTextFiled() {
        self.textFiled.rx.text.orEmpty
            .subscribe(onNext: { (text) in
               print(text)
            })
            .disposed(by: disposeBag)
        self.textFiled.rx.text
            .bind(to: self.button.rx.title())
            .disposed(by: disposeBag)

    }
通知
func setupNotification(){
        NotificationCenter.default.rx.notification(UIResponder.keyboardWillShowNotification)
            .subscribe(onNext: { (noti) in
                print(noti)
            })
        .disposed(by: disposeBag)
        
    }
手势
func setupGestureRecognizer(){
        let tap = UITapGestureRecognizer()
        self.label.addGestureRecognizer(tap)
        self.label.isUserInteractionEnabled = true
        tap.rx.event.subscribe(onNext: { (tap) in
            print(tap.view)
        })
        .disposed(by: disposeBag)
    }
timer

核心逻辑,隔一秒钟发送一个响应,并不是NSTimer

func setupTimer() {
        var timer: Observable<Int>!
        timer = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
        timer.subscribe(onNext: { (num) in
            print(num)
        })
        .disposed(by: disposeBag)
    }
网络
  func setupNextwork() {
        let url = URL(string: "https://www.baidu.com")
      /// 原实现代码
      URLSession.shared.dataTask(with: url!) { (data, response, error) in
         print(String.init(data:data!,encoding: .utf8))
      }.resume()
      /// RX实现
      URLSession.shared.rx.response(request: URLRequest(url: url!))
        .subscribe(onNext: { (response,data) in
            print(response)
        }).disposed(by: disposeBag)   
    }

万物皆rx?点进去看看,会发现rx在ReactiveCompatible这个协议的扩展里面,

extension ReactiveCompatible {

    /// Reactive extensions.
    public static var rx: RxSwift.Reactive<Self>.Type

    /// Reactive extensions.
    public var rx: RxSwift.Reactive<Self>
}
/// A type that has reactive extensions.
public protocol ReactiveCompatible {
    /// Extended type
    associatedtype CompatibleType

    /// Reactive extensions.
    static var rx: Reactive<CompatibleType>.Type { get set }

    /// Reactive extensions.
    var rx: Reactive<CompatibleType> { get set }
}

划重点了,所有的NSObject都遵循这个协议。

/// Extend NSObject with `rx` proxy.
extension NSObject: ReactiveCompatible { }

核心Observable可观察序列

序列Sequence

是一系列相同类型的值的集合,并且提供对这些值的迭代能力,序列思维能让未来的事情随时响应,是响应式最核心的应用。包括有穷序列和无穷序列。

Observable生命周期

image.png
核心逻辑
        let ob = Observable<Any>.create { (obserber) -> Disposable in
            obserber.onNext("发送信号")
            obserber.onCompleted()
            return Disposables.create()
        }

        let _ = ob.subscribe(onNext: { (text) in
            print("订阅到:\(text)")
        }, onError: { (error) in
            print("error: \(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("销毁")
        }

1.创建序列:create
查看RxSwift的源码,Create.swift这个文件

public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }

AnonymousObservable继承自Producer,保存闭包subscribeHandler。Producer继承自 Observable实现了subscribe方法。

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

Producer

class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }
}

Observable

public class Observable<Element> : ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element
    
    init() {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
    }
    
    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    deinit {
#if TRACE_RESOURCES
        _ = Resources.decrementTotal()
#endif
    }

    // this is kind of ugly I know :(
    // Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯

    /// Optimizations for map operator
    internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
        return _map(source: self, transform: transform)
    }
}

2.订阅序列:subscribe
点进去源码看subscribe,会发现里面创建了一个观察者AnonymousObserver对象,初始化里面传参,参数是一个尾随闭包,实现event枚举就会调用onNext事件,在最后的return里面有个self.asObservable().subscribe(observer),asObservable()看上面的Observable ,subscribe看上面的Producer。

    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}

3.发送信号:onNext

extension ObserverType {
    
    /// Convenience method equivalent to `on(.next(element: E))`
    ///
    /// - parameter element: Next element to send to observer(s)
    public func onNext(_ element: E) {
        self.on(.next(element))
    }
    
    /// Convenience method equivalent to `on(.completed)`
    public func onCompleted() {
        self.on(.completed)
    }
    
    /// Convenience method equivalent to `on(.error(Swift.Error))`
    /// - parameter error: Swift.Error to send to observer(s)
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

public enum Event<Element> {
    /// Next element is produced.
    case next(Element)

    /// Sequence terminated with an error.
    case error(Swift.Error)

    /// Sequence completed successfully.
    case completed
}

当序列发送onNext,就会发送Event的.next,subscribe里面的event就实现onNext事件。整个事件就串联上了。

上一篇 下一篇

猜你喜欢

热点阅读