ReactiveX

RxSwift(4)-Observable序列的创建

2020-04-19  本文已影响0人  xxxxxxxx_123

create

根据指定的订阅方法创建一个可观察序列

func createTest() {
    Observable<String>.create { (observer) -> Disposable in
        observer.onNext("我发送了事件")
        observer.onCompleted()
        return Disposables.create()
    }.subscribe { (element) in
        print("==\(element)==")
    }.disposed(by: disposeBag)
}

控制台输出:

==next(我发送了事件)==
==completed==

这是创建可观察序列最基本的方法。流程可以在前几篇文章看到,此处就不赘述了。

empty

创建一个空的观察序列,会直接发送一个completed事件。

let emtyOb = Observable<Int>.empty()
_ = emtyOb.subscribe(onNext: { (num) in
    print("\(num)")
}, onError: { (error) in
    print("错误")
}, onCompleted: {
    print("完成")
}) {
    print("释放")
}

运行程序,控制台输出:

完成
释放

源码如下:

public static func empty() -> Observable<E> {
    return EmptyProducer<E>()
}

final private class EmptyProducer<Element>: Producer<Element> {
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        // 发送`completed`事件
        observer.on(.completed)
        return Disposables.create()
    }
}

just

返回一个只包含单个元素的可观察序列。我们可以使用数组测试一下,看是不是只返回一个元素。

let array = [1,2]
Observable<[Int]>.just(array)
    .subscribe { (event) in
        print("event=\(event)")
    }.disposed(by: disposeBag)
        
let _ = Observable<[Int]>.just(array)
    .subscribe(onNext: { (num) in
        print("订阅=\(num)")
    }, onError: { (error) in
        print("错误")
    }, onCompleted: {
        print("完成")
    }) {
        print("释放")
    }

控制台输出:

event=next([1, 2])
event=completed
订阅=[1, 2]
完成
释放

直接把数组作为一个元素返回了。源码如下:返回一个元素之后,还会立即发送一个completed事件。

public static func just(_ element: E) -> Observable<E> {
    return Just(element: element)
}

final private class Just<Element>: Producer<Element> {
    private let _element: Element
    
    init(element: Element) {
        self._element = element
    }
    
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.next(self._element))
        observer.on(.completed)
        return Disposables.create()
    }
}

of

of是创建一个新的可观察实例,该实例具有可变数量的元素(必需要是同类型的。

print("==多个元素==")
let _ = Observable<String>.of("A","B")
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)
        
print("==字典==")
let _ = Observable<[String: Any]>.of(["name":"A","age":18])
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)
        
print("==数组==")
let _ = Observable<[String]>.of(["A","B"])
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)

我们只针对了多个元素、字典、数组进行了测试,输出结果如下:

==多个元素==
next(A)
next(B)
completed
==字典==
next(["age": 18, "name": "A"])
completed
==数组==
next(["A", "B"])
completed

可以看出如果是多个元素会循环发出事件,一个元素对应一个事件。源码如下:

public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
       return ObservableSequence(elements: elements, scheduler: scheduler)
}

final private class ObservableSequence<S: Sequence>: Producer<S.Iterator.Element> {
    fileprivate let _elements: S
    fileprivate let _scheduler: ImmediateSchedulerType

    init(elements: S, scheduler: ImmediateSchedulerType) {
        self._elements = elements
        self._scheduler = scheduler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

func run() -> Disposable {
    return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
        var mutableIterator = iterator
        if let next = mutableIterator.next() {
            self.forwardOn(.next(next))
            recurse(mutableIterator)
        } else {
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

可以看出of的订阅流程是通过ObservableSequenceSink,然后通过mutableIterator迭代器挨个处理发送。

from

将可选内容转换为可观察序列,也就是说系统会对我们的可选类型进行处理。

let optional: Int? = 1
Observable.from(optional: optional)
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)
    
// 控制台输出:
next(1)
completed

源码如下:

public static func from(optional: E?) -> Observable<E> {
    return ObservableOptional(optional: optional)
}

final private class ObservableOptional<E>: Producer<E> {
    private let _optional: E?
    
    init(optional: E?) {
        self._optional = optional
    }
    
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        // 此处做了一次解包处理
        if let element = self._optional {
            observer.on(.next(element))
        }
        observer.on(.completed)
        return Disposables.create()
    }
}

deferred

直到订阅发生,才创建Observable,并且为每位订阅者创建全新的Observable。看上去每位订阅者都是对同一个Observable产生订阅,实际上它们都获得了独立的序列。相当于延迟Observable序列的初始化,可以保证拿到的数据都是最新的。

public static func deferred(_ observableFactory: @escaping () throws -> Observable<E>)
    -> Observable<E> {
    return Deferred(observableFactory: observableFactory)
}

final private class Deferred<S: ObservableType>: Producer<S.E> {
    typealias Factory = () throws -> S
    
    private let _observableFactory : Factory
    
    init(observableFactory: @escaping Factory) {
        self._observableFactory = observableFactory
    }
    
    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == S.E {
        let sink = DeferredSink(observableFactory: self._observableFactory, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

// DeferredSink
func run() -> Disposable {
    do {
        let result = try self._observableFactory()
        return result.subscribe(self)
    } catch let e {
        self.forwardOn(.error(e))
        self.dispose()
        return Disposables.create()
    }
}

可以看出在调用DeferredSinkrun方法的时候,才会调用订阅方法。

range

使用指定的调度者生成并发送观察者消息,生成指定范围内的可观察整数序列。下面例子就是从1开始5个整数:

Observable.range(start: 1, count: 5)
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)
    
控制台输出
next(1)
next(2)
next(3)
next(4)
next(5)
completed

源码实现:

public static func range(start: E, count: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
    return RangeProducer<E>(start: start, count: count, scheduler: scheduler)
}

final private class RangeProducer<E: RxAbstractInteger>: Producer<E> {
    fileprivate let _start: E
    fileprivate let _count: E
    fileprivate let _scheduler: ImmediateSchedulerType

    init(start: E, count: E, scheduler: ImmediateSchedulerType) {
        guard count >= 0 else {
            rxFatalError("count can't be negative")
        }

        guard start &+ (count - 1) >= start || count == 0 else {
            rxFatalError("overflow of count")
        }

        self._start = start
        self._count = count
        self._scheduler = scheduler
    }
    
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

final private class RangeSink<O: ObserverType>: Sink<O> where O.E: RxAbstractInteger {
    typealias Parent = RangeProducer<O.E>
    
    private let _parent: Parent
    
    init(parent: Parent, observer: O, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(0 as O.E) { i, recurse in
            if i < self._parent._count {
                // 递归发送事件 每次都在原来的值的基础上加1,
                self.forwardOn(.next(self._parent._start + i))
                recurse(i + 1)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}

generate

通过运行产生序列元素的状态驱动循环,使用指定的调度程序运行循环,发送观察者消息,从而生成一个可观察序列。该方法创建一个只有当提供的所有的判断条件都为true的时候,才会发出相应的事件。
初始值给定,然后判断condition,然后再进行iterate操作,会一直递归下去,直到条件不满足,才会停止。

下面我们来看一个例子:

Observable.generate(initialState: 0,// 初始值
    condition: { $0 < 10}, 
    iterate: { $0 + 3 })  
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)
        
// 控制台输出
next(0)
next(3)
next(6)
next(9)
completed

也可以用来遍历数组:

let arr = ["A","B","C","D"]
Observable.generate(initialState: 0,
    condition: { $0 < arr.count}, 
    iterate: { $0 + 1 }) // 下标加1
    .subscribe(onNext: {
        print("遍历arr:",arr[$0])
    })
    .disposed(by: disposeBag)

源码如下:

public static func generate(initialState: E, condition: @escaping (E) throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: @escaping (E) throws -> E) -> Observable<E> {
    return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler)
}
    
final private class Generate<S, E>: Producer<E> {
    ......
    
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        let sink = GenerateSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

// GenerateSink
func run() -> Disposable {
    return self._parent._scheduler.scheduleRecursive(true) { isFirst, recurse -> Void in
        do {
            if !isFirst {
                // 赋值 迭代
                self._state = try self._parent._iterate(self._state)
            }
                
            // 判断是否满足条件 _condition
            if try self._parent._condition(self._state) {
                // 回调结果下一次使用
                let result = try self._parent._resultSelector(self._state)
                self.forwardOn(.next(result))
                    
                recurse(false)
            } else {
                self.forwardOn(.completed)
                self.dispose()
            }
        } catch let error {
            self.forwardOn(.error(error))
            self.dispose()
        }
    }
}

interval

返回一个Observable,该序列在每个周期之后生成一个值,使用指定的调度者运行计时器并发送观察者消息,发出的消息是一个索引数,从0开始。

Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .subscribe { (event) in
        print("interval==\(event)")
    }
    .disposed(by: disposeBag)
    
控制台输出:
interval==next(0)
interval==next(1)
interval==next(2)
......

源码如下,和timer的一样,只是初始的dueTime传的是间隔时间。

public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
    -> Observable<E> {
    return Timer(
        dueTime: period,
        period: period,
        scheduler: scheduler
    )
}

repeatElement

repeatElement创建一个Observable,不停的发送同一个元素。

Observable<Int>.repeatElement(0)
    .subscribe { (event) in
        print("repeatElement=\(event)=")
    }
    .disposed(by: disposeBag)

// 控制台输出
repeatElement=next(0)=
repeatElement=next(0)=
repeatElement=next(0)=

源码如下:可以看到其内部实现其实是一个递归函数,在不停地调用发送事件。

public static func repeatElement(_ element: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
    return RepeatElement(element: element, scheduler: scheduler)
}

final private class RepeatElement<Element>: Producer<Element> {
    ......
    
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = RepeatElementSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()

        return (sink: sink, subscription: subscription)
    }
}

// RepeatElementSink
func run() -> Disposable {
    // 递归调用
    return self._parent._scheduler.scheduleRecursive(self._parent._element) { e, recurse in
        self.forwardOn(.next(e))
        recurse(e)
    }
}
上一篇 下一篇

猜你喜欢

热点阅读