RxSwift(4)-Observable序列的创建
create
根据指定的订阅方法创建一个可观察序列
func createTest() {
Observable<String>.create { (observer) -> Disposable in
observer.onNext("我发送了事件")
observer.onCompleted()
return Disposables.create()
}.subscribe { (element) in
print("==\(element)==")
}.disposed(by: disposeBag)
}
控制台输出:
==next(我发送了事件)==
==completed==
这是创建可观察序列最基本的方法。流程可以在前几篇文章看到,此处就不赘述了。
empty
创建一个空的观察序列,会直接发送一个completed
事件。
let emtyOb = Observable<Int>.empty()
_ = emtyOb.subscribe(onNext: { (num) in
print("\(num)")
}, onError: { (error) in
print("错误")
}, onCompleted: {
print("完成")
}) {
print("释放")
}
运行程序,控制台输出:
完成
释放
源码如下:
public static func empty() -> Observable<E> {
return EmptyProducer<E>()
}
final private class EmptyProducer<Element>: Producer<Element> {
override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
// 发送`completed`事件
observer.on(.completed)
return Disposables.create()
}
}
just
返回一个只包含单个元素的可观察序列。我们可以使用数组测试一下,看是不是只返回一个元素。
let array = [1,2]
Observable<[Int]>.just(array)
.subscribe { (event) in
print("event=\(event)")
}.disposed(by: disposeBag)
let _ = Observable<[Int]>.just(array)
.subscribe(onNext: { (num) in
print("订阅=\(num)")
}, onError: { (error) in
print("错误")
}, onCompleted: {
print("完成")
}) {
print("释放")
}
控制台输出:
event=next([1, 2])
event=completed
订阅=[1, 2]
完成
释放
直接把数组作为一个元素返回了。源码如下:返回一个元素之后,还会立即发送一个completed
事件。
public static func just(_ element: E) -> Observable<E> {
return Just(element: element)
}
final private class Just<Element>: Producer<Element> {
private let _element: Element
init(element: Element) {
self._element = element
}
override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
observer.on(.next(self._element))
observer.on(.completed)
return Disposables.create()
}
}
of
of
是创建一个新的可观察实例,该实例具有可变数量的元素(必需要是同类型的。
print("==多个元素==")
let _ = Observable<String>.of("A","B")
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
print("==字典==")
let _ = Observable<[String: Any]>.of(["name":"A","age":18])
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
print("==数组==")
let _ = Observable<[String]>.of(["A","B"])
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
我们只针对了多个元素、字典、数组进行了测试,输出结果如下:
==多个元素==
next(A)
next(B)
completed
==字典==
next(["age": 18, "name": "A"])
completed
==数组==
next(["A", "B"])
completed
可以看出如果是多个元素会循环发出事件,一个元素对应一个事件。源码如下:
public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return ObservableSequence(elements: elements, scheduler: scheduler)
}
final private class ObservableSequence<S: Sequence>: Producer<S.Iterator.Element> {
fileprivate let _elements: S
fileprivate let _scheduler: ImmediateSchedulerType
init(elements: S, scheduler: ImmediateSchedulerType) {
self._elements = elements
self._scheduler = scheduler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
} else {
self.forwardOn(.completed)
self.dispose()
}
}
}
可以看出of
的订阅流程是通过ObservableSequenceSink
,然后通过mutableIterator
迭代器挨个处理发送。
from
将可选内容转换为可观察序列,也就是说系统会对我们的可选类型进行处理。
let optional: Int? = 1
Observable.from(optional: optional)
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
// 控制台输出:
next(1)
completed
源码如下:
public static func from(optional: E?) -> Observable<E> {
return ObservableOptional(optional: optional)
}
final private class ObservableOptional<E>: Producer<E> {
private let _optional: E?
init(optional: E?) {
self._optional = optional
}
override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
// 此处做了一次解包处理
if let element = self._optional {
observer.on(.next(element))
}
observer.on(.completed)
return Disposables.create()
}
}
deferred
直到订阅发生,才创建Observable
,并且为每位订阅者创建全新的Observable
。看上去每位订阅者都是对同一个Observable
产生订阅,实际上它们都获得了独立的序列。相当于延迟Observable
序列的初始化,可以保证拿到的数据都是最新的。
public static func deferred(_ observableFactory: @escaping () throws -> Observable<E>)
-> Observable<E> {
return Deferred(observableFactory: observableFactory)
}
final private class Deferred<S: ObservableType>: Producer<S.E> {
typealias Factory = () throws -> S
private let _observableFactory : Factory
init(observableFactory: @escaping Factory) {
self._observableFactory = observableFactory
}
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == S.E {
let sink = DeferredSink(observableFactory: self._observableFactory, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
// DeferredSink
func run() -> Disposable {
do {
let result = try self._observableFactory()
return result.subscribe(self)
} catch let e {
self.forwardOn(.error(e))
self.dispose()
return Disposables.create()
}
}
可以看出在调用DeferredSink
的run
方法的时候,才会调用订阅方法。
range
使用指定的调度者生成并发送观察者消息,生成指定范围内的可观察整数序列。下面例子就是从1开始5个整数:
Observable.range(start: 1, count: 5)
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
控制台输出
next(1)
next(2)
next(3)
next(4)
next(5)
completed
源码实现:
public static func range(start: E, count: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return RangeProducer<E>(start: start, count: count, scheduler: scheduler)
}
final private class RangeProducer<E: RxAbstractInteger>: Producer<E> {
fileprivate let _start: E
fileprivate let _count: E
fileprivate let _scheduler: ImmediateSchedulerType
init(start: E, count: E, scheduler: ImmediateSchedulerType) {
guard count >= 0 else {
rxFatalError("count can't be negative")
}
guard start &+ (count - 1) >= start || count == 0 else {
rxFatalError("overflow of count")
}
self._start = start
self._count = count
self._scheduler = scheduler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
final private class RangeSink<O: ObserverType>: Sink<O> where O.E: RxAbstractInteger {
typealias Parent = RangeProducer<O.E>
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(0 as O.E) { i, recurse in
if i < self._parent._count {
// 递归发送事件 每次都在原来的值的基础上加1,
self.forwardOn(.next(self._parent._start + i))
recurse(i + 1)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
}
generate
通过运行产生序列元素的状态驱动循环,使用指定的调度程序运行循环,发送观察者消息,从而生成一个可观察序列。该方法创建一个只有当提供的所有的判断条件都为true
的时候,才会发出相应的事件。
初始值给定,然后判断condition
,然后再进行iterate
操作,会一直递归下去,直到条件不满足,才会停止。
-
initialState
:初始状态。 -
condition
:终止生成的条件(返回“false”时) -
iterate
:迭代步骤函数 -
scheduler
:用来运行生成循环的调度器,默认为CurrentThreadScheduler.instance
下面我们来看一个例子:
Observable.generate(initialState: 0,// 初始值
condition: { $0 < 10},
iterate: { $0 + 3 })
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
// 控制台输出
next(0)
next(3)
next(6)
next(9)
completed
也可以用来遍历数组:
let arr = ["A","B","C","D"]
Observable.generate(initialState: 0,
condition: { $0 < arr.count},
iterate: { $0 + 1 }) // 下标加1
.subscribe(onNext: {
print("遍历arr:",arr[$0])
})
.disposed(by: disposeBag)
源码如下:
public static func generate(initialState: E, condition: @escaping (E) throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: @escaping (E) throws -> E) -> Observable<E> {
return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler)
}
final private class Generate<S, E>: Producer<E> {
......
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
let sink = GenerateSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
// GenerateSink
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(true) { isFirst, recurse -> Void in
do {
if !isFirst {
// 赋值 迭代
self._state = try self._parent._iterate(self._state)
}
// 判断是否满足条件 _condition
if try self._parent._condition(self._state) {
// 回调结果下一次使用
let result = try self._parent._resultSelector(self._state)
self.forwardOn(.next(result))
recurse(false)
} else {
self.forwardOn(.completed)
self.dispose()
}
} catch let error {
self.forwardOn(.error(error))
self.dispose()
}
}
}
interval
返回一个Observable
,该序列在每个周期之后生成一个值,使用指定的调度者运行计时器并发送观察者消息,发出的消息是一个索引数,从0开始。
Observable<Int>.interval(1, scheduler: MainScheduler.instance)
.subscribe { (event) in
print("interval==\(event)")
}
.disposed(by: disposeBag)
控制台输出:
interval==next(0)
interval==next(1)
interval==next(2)
......
源码如下,和timer
的一样,只是初始的dueTime
传的是间隔时间。
public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
-> Observable<E> {
return Timer(
dueTime: period,
period: period,
scheduler: scheduler
)
}
repeatElement
repeatElement
创建一个Observable
,不停的发送同一个元素。
Observable<Int>.repeatElement(0)
.subscribe { (event) in
print("repeatElement=\(event)=")
}
.disposed(by: disposeBag)
// 控制台输出
repeatElement=next(0)=
repeatElement=next(0)=
repeatElement=next(0)=
源码如下:可以看到其内部实现其实是一个递归函数,在不停地调用发送事件。
public static func repeatElement(_ element: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return RepeatElement(element: element, scheduler: scheduler)
}
final private class RepeatElement<Element>: Producer<Element> {
......
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = RepeatElementSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
// RepeatElementSink
func run() -> Disposable {
// 递归调用
return self._parent._scheduler.scheduleRecursive(self._parent._element) { e, recurse in
self.forwardOn(.next(e))
recurse(e)
}
}