RxSwift 核心逻辑初探
RxSwift
RxSwift: Reactive Extensions Swift,函数响应式编程的基础框架。简化代码,易读性高。
函数式,数学概念y = f(x),x也可以是个函数,那么y = f(f(x))。利用面向函数,可读性更高,分段式更简洁。
响应式,面向数据流和变化传播的编程范式。数据流:只能以事先规定好的顺序被读取一次的数据的一个序列。变化传播:类似观察者模式,变化了要通知别人。
RxSwift初体验
KVO
不需要再去写被观察对象的addObserver:forKeyPath:options:context: 方法注册观察者
观察者也不需要实现observeValueForKeyPath:ofObject:change:context: 回调方法
func setupKVO() {
/// self.person.addObserver(self, forKeyPath: "name", options: .new, context: nil)
self.person.rx.observeWeakly(String.self, "name")
.subscribe(onNext: { (value) in
print(value as Any)
})
.disposed(by: disposeBag)
}
UI
button为例,业务逻辑和功能逻辑放在一起,更清晰,方便查找
func setupButton() {
///直接.tap就可以实现touchUpInside事件
self.button.rx.tap
.subscribe(onNext: { () in
print("点击来了")
})
.disposed(by: disposeBag)
///如果不想用touchUpInside事件,可以这样
self.button.rx.controlEvent(.touchUpOutside)
}
代理
可以省略代理方法实现
func setupTextFiled() {
self.textFiled.rx.text.orEmpty
.subscribe(onNext: { (text) in
print(text)
})
.disposed(by: disposeBag)
self.textFiled.rx.text
.bind(to: self.button.rx.title())
.disposed(by: disposeBag)
}
通知
func setupNotification(){
NotificationCenter.default.rx.notification(UIResponder.keyboardWillShowNotification)
.subscribe(onNext: { (noti) in
print(noti)
})
.disposed(by: disposeBag)
}
手势
func setupGestureRecognizer(){
let tap = UITapGestureRecognizer()
self.label.addGestureRecognizer(tap)
self.label.isUserInteractionEnabled = true
tap.rx.event.subscribe(onNext: { (tap) in
print(tap.view)
})
.disposed(by: disposeBag)
}
timer
核心逻辑,隔一秒钟发送一个响应,并不是NSTimer
func setupTimer() {
var timer: Observable<Int>!
timer = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
timer.subscribe(onNext: { (num) in
print(num)
})
.disposed(by: disposeBag)
}
网络
func setupNextwork() {
let url = URL(string: "https://www.baidu.com")
/// 原实现代码
URLSession.shared.dataTask(with: url!) { (data, response, error) in
print(String.init(data:data!,encoding: .utf8))
}.resume()
/// RX实现
URLSession.shared.rx.response(request: URLRequest(url: url!))
.subscribe(onNext: { (response,data) in
print(response)
}).disposed(by: disposeBag)
}
万物皆rx?点进去看看,会发现rx在ReactiveCompatible这个协议的扩展里面,
extension ReactiveCompatible {
/// Reactive extensions.
public static var rx: RxSwift.Reactive<Self>.Type
/// Reactive extensions.
public var rx: RxSwift.Reactive<Self>
}
/// A type that has reactive extensions.
public protocol ReactiveCompatible {
/// Extended type
associatedtype CompatibleType
/// Reactive extensions.
static var rx: Reactive<CompatibleType>.Type { get set }
/// Reactive extensions.
var rx: Reactive<CompatibleType> { get set }
}
划重点了,所有的NSObject都遵循这个协议。
/// Extend NSObject with `rx` proxy.
extension NSObject: ReactiveCompatible { }
核心Observable可观察序列
序列Sequence
是一系列相同类型的值的集合,并且提供对这些值的迭代能力,序列思维能让未来的事情随时响应,是响应式最核心的应用。包括有穷序列和无穷序列。
Observable生命周期
image.png核心逻辑
let ob = Observable<Any>.create { (obserber) -> Disposable in
obserber.onNext("发送信号")
obserber.onCompleted()
return Disposables.create()
}
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("完成")
}) {
print("销毁")
}
1.创建序列:create
查看RxSwift的源码,Create.swift这个文件
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
AnonymousObservable继承自Producer,保存闭包subscribeHandler。Producer继承自 Observable实现了subscribe方法。
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
Producer
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
Observable
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod()
}
public func asObservable() -> Observable<E> {
return self
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
// this is kind of ugly I know :(
// Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯
/// Optimizations for map operator
internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
return _map(source: self, transform: transform)
}
}
2.订阅序列:subscribe
点进去源码看subscribe,会发现里面创建了一个观察者AnonymousObserver对象,初始化里面传参,参数是一个尾随闭包,实现event枚举就会调用onNext事件,在最后的return里面有个self.asObservable().subscribe(observer),asObservable()看上面的Observable ,subscribe看上面的Producer。
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
3.发送信号:onNext
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
self.on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
当序列发送onNext,就会发送Event的.next,subscribe里面的event就实现onNext事件。整个事件就串联上了。