框架学习RxSwift2.0 Observable创建

2019-07-24  本文已影响0人  数字d

1.常规操作导入RXSwift,参看https://www.jianshu.com/p/b73231a29949

Rx 是 ReactiveX 的缩写,简单来说就是基于异步 Event(事件)序列的响应式编程。
Rx 可以简化异步编程方法,并提供更优雅的数据绑定。让我们可以时刻响应新的数据同时顺序地处理它们。

函数响应式编程 = 函数式编程 + 响应式编程

函数式:

(Functional Programming) 把运算过程尽量写成一系列嵌套的函数调用

响应式编程:

简称RP(Reactive Programming),响应式编程是一种面向数据流和变化传播的编程范式。

在命令式编程环境中,a:=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。

在iOS开发中我们经常会响应一些事件button、tap、textField、textView、notifaction、KVO、NSTimer等等这些,都需要做响应监听,响应后都需要在对应的响应事件中去做处理,而原生开发中,触发对象与响应方法是分离的,如button的初始化和点击响应方法是分离的。

override func viewDidLoad(){
    button = UIButton.init(frame: CGRect(x: 40, y: 100, width: width-80, height: 40))
    button.backgroundColor = .gray
    button.setTitle("按钮", for: .normal)
    self.view.addSubview(button)
    button.addTarget(self, action: #selector(clickBtn(button:)), for: .touchUpInside)
}
//button event
@objc func clickBtn(button:UIButton) {
    print("点击")
}

RxSwift按钮点击事件的实现


...
let bag = DisposeBag()
....
 
self.button.rx.tap    //序列
            .subscribe(onNext: { () in   //订阅
                print("Button clicked!")
            }, onError: { (error) in
                print("错误信息")
            }, onCompleted: {
                print("订阅完成")
            })
            .disposed(by: bag)    //销毁

为什么按钮可以.rx呢?
在RxSwift项目中搜索Reactive.swift文件,协议ReactiveCompatible
有rx属性,extension NSObject: ReactiveCompatible表示NSObject类及其子类实现协议ReactiveCompatible,所以NSObject类及其子类都可以.rx

//  Reactive.swift

...
public protocol ReactiveCompatible {
    /// Extended type
    associatedtype CompatibleType

    /// Reactive extensions.
    static var rx: Reactive<CompatibleType>.Type { get set }

    /// Reactive extensions.
    var rx: Reactive<CompatibleType> { get set }
}

import class Foundation.NSObject


extension NSObject: ReactiveCompatible { }

RxSwift

全称ReactiveX for Swift,是一个简化异步编程的框架,实现了函数响应式编程,事件与对象紧密联系,业务流清晰,便于管理。在RxSwift中,所有异步操作(事件)和数据流均被抽象为可观察序列的概念。

RXSwift核心:

响应式编程的核心就是可监听序列的产生以及如何监听。

名词解释:

Observable:是一个可监听序列 作用是产生事件

可以理解为观察者模式里的被观察者

Observer: 序列的监听(订阅)者 作用是消费事件

可以理解为观察者模式里的观察者

Disposable: 可被清除的资源

简单梳理下上面的流程

可监听序列被订阅了,就会调用Producer的subscribe方法
调用self的run方法,当前我们的self是Producer的子类AnonymousObservable

创建AnonymousObservableSink并将订阅者observer传进去,接着调用AnonymousObservableSink实例的run,参数是AnonymousObservable实例。

AnonymousObservableSink实例的run又调用AnonymousObservable的_subscribeHandler闭包,参数是AnyObserver,AnyObserver保存了事件的回调,而_subscribeHandler闭包也就是可监听序列创建时传的闭包。

执行事件
以next事件为例
如:订阅onNext事件。

代码:

在creat尾随闭包中的onNext具体实现代码是

public func onNext(_ element: E) {
        self.on(.next(element))
    }

内部流程如下:

调用observer的onNext方法,接着调用observer的on(_ event: Event<Element>)并将next事件传进去

执行AnyObserver的observer的事件回调

执行AnonymousObservableSink的func on(_ event: Event<Element>)方法,接着调用ObserverBase的func on(_ event: Event<Element>)让真正的Observer类去响应事件

调用AnonymousObserver的onCore方法去执行真正的事件回调。

序列例子:

1.创建序列
     let ob = Observable<Any>.create { (obserber) -> Disposable in
            // 3:发送信号
            obserber.onNext("框架班级")
            obserber.onCompleted()
//            obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil))
            return Disposables.create()
        }

//2.订阅
  let _ = ob.subscribe(onNext: { (text) in
//4.订阅到
            print("订阅到:\(text)")
        }, onError: { (error) in
            print("error: \(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("销毁")
        }

代码的执行顺序不按照代码行数一行一行的执行,说明这里含有闭包。

看具体创建序列的流程:
creat方法

extension ObservableType {

    /**
         Creates an observable sequence from a specified subscribe method implementation.
    
         - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
    
         - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
         - returns: The observable sequence with the specified implementation for the `subscribe` method.
         */
    public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> RxSwift.Disposable) -> RxSwift.Observable<Self.E>
}

Create.swift文件中的源码实现

extension ObservableType {
    // MARK: create

    /**
     Creates an observable sequence from a specified subscribe method implementation.

     - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)

     - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
     - returns: The observable sequence with the specified implementation for the `subscribe` method.
     */
    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
}

在Create.swift文件中实现的方法ObservableType.create,

extension ObservableType {
    // MARK: create

    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
}

上面代码可以看出creat方法返回了一个AnonymousObservable(匿名序列)

接着看AnonymousObservable类

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
//初始化时候带的参数,这里保存了一个闭包,用于subscribe回调
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

这里的AnonymousObservable类并没有subscribe方法,但是父类Producer有

lass Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

创建序列creat方法-生成AnonymousObservable序列-保存subscribeHandler
到此整个创建序列的流程结束。

接下来是订阅流程:

ob.subscribe方法的实现

  extension ObservableType {

 
    public func subscribe(onNext: ((Self.E) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> RxSwift.Disposable
}

而方法实现的源码在ObservableType+Extensions.swift文件中

   
    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
      
...
            
//创建observer ,event从AnonymousObserver()构建
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
//对next,error,complated,闭包进行初始化
//只要观察者observer调用了event的.next事件,这里就会调用订阅事件.onNext
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
//回调了当前序列(ob)的_subscribeHandler闭包,由于AnonymousObserable类中没有subscribe回调,就使用其父类Produce的subscribe方法,调用子类的实现
                disposable
            )
    }

在AnonymousObserver.swift文件中的实现

final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
    typealias Element = ElementType
    
    typealias EventHandler = (Event<Element>) -> Void
    
    private let _eventHandler : EventHandler
    
    init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
        _ = Resources.incrementTotal()
#endif
        self._eventHandler = eventHandler //保存了包含事件 的闭包
    }

    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
    
#if TRACE_RESOURCES
    deinit {
        _ = Resources.decrementTotal()
    }
#endif
}

实际流程参看下图

3.png

基本UI事件监听代码实现:https://gitee.com/xgkp/RX2.0login.git

参考来源:
1.https://www.jianshu.com/p/3617ab385060
2.https://blog.csdn.net/qq_18951479/article/details/96832932
3.https://www.jianshu.com/p/c9f854718933

上一篇下一篇

猜你喜欢

热点阅读