RXSwift

RxSwift---Observable创建(三)

2023-04-22  本文已影响0人  会骑车的iOSer

在上一个篇章中,我们一起探索了Rxswift的核心逻辑,对Rxswift有了更近一步的理解
正所谓光说不练,假把式。那么接下来我们来看看在我们平常开发中用到的一些序列创建方式

1:empty---空序列

使用empty函数创建一个空序列,事件是Int类型的,由于是空序列,也就是没有序列,所以只能complete

        let emptyOb = Observable<Int>.empty()
        emptyOb.subscribe(onNext: { number in
            print("订阅:", number)
        }, onError: { error in
            print("error:", error)
        }, onCompleted: {
            print("完成回调")
        }) {
            print("释放回调")
        }

//打印结果:完成回调
          释放回调

为什么直接完成回调了呢,我们点击源码进行分析

extension ObservableType {

    public static func empty() -> Observable<Element> {
        EmptyProducer<Element>()
    }
}

final private class EmptyProducer<Element>: Producer<Element> {
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        observer.on(.completed)
        return Disposables.create()
    }
}

2:just---单个信号序列

该方法通过传入一个默认值来初始化,构建一个只有一个元素的Observable队列,订阅完信息自动complete

        // 单个信号序列创建
        print("******** just ********")
        let justOb = Observable<String>.just("逸华爱Moto")
        justOb.subscribe(onNext: { number in
            print("订阅:", number)
        }, onError: { error in
            print("error:", error)
        }, onCompleted: {
            print("完成回调")
        }) {
            print("释放回调")
        }

//打印结果
******** just ********
订阅: 逸华爱Moto
完成回调
释放回调

简单来说,就是传入什么输出什么,看看源码

extension ObservableType {

    public static func just(_ element: Element) -> Observable<Element> {
        Just(element: element)
    }

    //这里省略了一些代码...
}

final private class Just<Element>: Producer<Element> {
    private let element: Element
    
    init(element: Element) {
        self.element = element
    }
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        observer.on(.next(self.element))
        observer.on(.completed)
        return Disposables.create()
    }
}

3:of

        print("******** of ********")
        //多个元素
        Observable.of(1,2,3,4)
            .subscribe { element in
                print("订阅:", element)
            }
            .disposed(by: disposeBag)
        
        //数组
        Observable.of([1,2,3,4])
            .subscribe { element in
                print("订阅:", element)
            }
            .disposed(by: disposeBag)
        
        //字典
        Observable.of(["name":"逸华","hobby":"骑摩托"])
            .subscribe { element in
                print("订阅:", element)
            }
            .disposed(by: disposeBag)

//打印结果
******** of ********
订阅: next(1)
订阅: next(2)
订阅: next(3)
订阅: next(4)
订阅: completed
订阅: next([1, 2, 3, 4])
订阅: completed
订阅: next(["name": "逸华", "hobby": "骑摩托"])
订阅: completed

话不多说来看下具体实现

extension ObservableType {
    public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
        ObservableSequence(elements: elements, scheduler: scheduler)
    }
}

final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
    fileprivate let elements: Sequence
    fileprivate let scheduler: ImmediateSchedulerType

    init(elements: Sequence, scheduler: ImmediateSchedulerType) {
        self.elements = elements
        self.scheduler = scheduler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

final private class ObservableSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink<Observer> where Sequence.Element == Observer.Element {
    typealias Parent = ObservableSequence<Sequence>

    private let parent: Parent

    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self.parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        return self.parent.scheduler.scheduleRecursive(self.parent.elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator
            if let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}

4.from

        print("******** from ********")
        //MARK:  from
        // 从集合中获取序列:数组,集合,set 获取序列 - 有可选项处理 - 更安全
        Observable<[String]>.from(optional: nil)
            .subscribe { event in
                print(event)
            }.disposed(by: disposeBag)
        
        Observable<[String]>.from(optional: ["逸华", "爱摩托"])
            .subscribe { event in
                print(event)
            }.disposed(by: disposeBag)

//打印结果:
******** from ********
completed
next(["逸华", "爱摩托"])
completed

看下源码

extension ObservableType {
    public static func from(optional: Element?) -> Observable<Element> {
        ObservableOptional(optional: optional)
    }
}


final private class ObservableOptional<Element>: Producer<Element> {
    private let optional: Element?
    
    init(optional: Element?) {
        self.optional = optional
    }
    
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let element = self.optional {
            observer.on(.next(element))
        }
        observer.on(.completed)
        return Disposables.create()
    }
}

5.defer

        print("******** defer ********")
        //MARK:  defer
        // 这里有一个需求:动态序列 - 根据外界的标识 - 动态输出
        // 使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。
        var isOdd = false
        _ = Observable<Int>.deferred { () -> Observable<Int> in
            // 这里设计我们的序列
            isOdd = !isOdd
            if isOdd {
                return Observable.of(1)
            }
            return Observable.of(0)
        }
        .subscribe { event in
            print(event)
        }

通过sink.run进入都下面的代码

    func run() -> Disposable {
        do {
            let result = try self.observableFactory()
            return result.subscribe(self)
        }
        catch let e {
            self.forwardOn(.error(e))
            self.dispose()
            return Disposables.create()
        }
    }

6:rang

print("******** range ********")
        Observable.range(start: 0, count: 5)
            .subscribe { number in
                print(number)
            }
            .disposed(by: disposeBag)

//打印结果:
******** range ********
next(0)
next(1)
next(2)
next(3)
next(4)
completed

源码

extension ObservableType where Element: RxAbstractInteger {

    public static func range(start: Element, count: Element, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
        RangeProducer<Element>(start: start, count: count, scheduler: scheduler)
    }
}

final private class RangeProducer<Element: RxAbstractInteger>: Producer<Element> {
    fileprivate let start: Element
    fileprivate let count: Element
    fileprivate let scheduler: ImmediateSchedulerType

    init(start: Element, count: Element, scheduler: ImmediateSchedulerType) {
        guard count >= 0 else {
            rxFatalError("count can't be negative")
        }

        guard start &+ (count - 1) >= start || count == 0 else {
            rxFatalError("overflow of count")
        }

        self.start = start
        self.count = count
        self.scheduler = scheduler
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

final private class RangeSink<Observer: ObserverType>: Sink<Observer> where Observer.Element: RxAbstractInteger {
    typealias Parent = RangeProducer<Observer.Element>
    
    private let parent: Parent
    
    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self.parent = parent
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        return self.parent.scheduler.scheduleRecursive(0 as Observer.Element) { i, recurse in
            if i < self.parent.count {
                self.forwardOn(.next(self.parent.start + i))
                recurse(i + 1)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}

7: generate

        print("******** generate ********")
        Observable.generate(initialState: 0,// 初始值
                            condition: { $0 < 10}, // 条件1
                            iterate: { $0 + 2 })  // 条件2 +2
            .subscribe { event in
                print(event)
            }.disposed(by: disposeBag)
                     
        // 数组遍历
        let arr = ["e_1","e_2","e_3","e_4","e_5"]
        Observable.generate(initialState: 0,// 初始值
                            condition: { $0 < arr.count}, // 条件1
                            iterate: { $0 + 1 })  // 条件2 +2
            .subscribe(onNext: {
                print("遍历arr:", arr[$0])
            }).disposed(by: disposeBag)

8:timer

        print("******** timer ********")
        Observable<Int>.timer(.seconds(5), period: .seconds(2), scheduler: MainScheduler.instance)
            .subscribe { event in
                print(event)
            }
            .disposed(by: disposeBag)

        // 因为没有指定期限period,故认定为一次性
        Observable<Int>.timer(.seconds(1), scheduler: MainScheduler.instance)
            .subscribe { event in
                print("111111111 \(event)")
            }
            .disposed(by: disposeBag)

9:repeatElement

        print("******** repeatElement ********")
        //MARK:  repeatElement
        // 该方法创建一个可以无限发出给定元素的 Event的 Observable 序列(永不终止)
        Observable<Int>.repeatElement(5)
            .subscribe { event in
                 print("订阅:", event)
            }
            .disposed(by: disposeBag)

10:error

        print("******** error ********")
        Observable<String>.error(NSError.init(domain: "发送错误", code: 10086, userInfo: ["reason": "unknow"]))
            .subscribe { event in
                print("订阅:", event)
            }
            .disposed(by: disposeBag)

11:never

        print("******** never ********")
        Observable<String>.never()
            .subscribe { event in
                print("走你", event)
            }
            .disposed(by: disposeBag)

序列的创建也是学习 RxSwift 的根基,当然RxSwift还有一些创建序列的方式,大家可以玩一玩

以上就是RxSwift 常用的序列创建方式,如果对你有帮助,请不要吝啬自己手里的点赞👍哦~~~

上一篇 下一篇

猜你喜欢

热点阅读