RxSwift - 操作符四
sample
不定期的对 Observable 取样
smple
sample 操作符将不定期的对源 Observable 进行取样操作。 通过第二个 Observable 来控制取样时机。一旦第二个
Observable 发出一个元素,就从源 Observable 中取出最 后产生的元素。
let source = PublishSubject<Int>()
let notifier = PublishSubject<String>()
source
.sample(notifier)
.subscribe(onNext: { print($0) })
.disposed(by: bag)
source.onNext(1)
//让源序列接收接收消息
notifier.onNext("A")
source.onNext(2)
//让源序列接收接收消息
notifier.onNext("B")
notifier.onNext("C")
source.onNext(3)
source.onNext(4)
//让源序列接收接收消息
notifier.onNext("D")
source.onNext(5)
//让源序列接收接收消息
notifier.onCompleted()
输出结果
1
2
4
5
分析
:从输出结果来看,我们会发现3不见了,为什么不见了呢,我们回到代码notifier.onNext("D")
,在这里,发出D之后呢,我们回到source源上,发现最新的是4,并不是3,也就是3会被丢弃了,也就是说,哪怕中间的再多的事件,但是我也是根据监听事件取源的最新一个事件
scan
持续的将 Observable 的每一个元素应用一个函数,然后发 出每一次函数返回的结果
scan
scan 操作符将对第一个元素应用一个函数,将结果作为第一 个元素发出。然后,将结果作为参数填入到第二个元素的应 用函数中,创建第二个元素。以此类推,直到遍历完全部的 元素。 这种操作符在其他地方有时候被称作是 accumulator 。
/// scan
Observable.of(1,2,3,4).scan(0) { a, b in
return a+b
}.subscribe { element in
print(element)
}.disposed(by: bag)
// 输出结果
// next(1)
// next(3)
// next(6)
// next(10)
// completed
/// reduce
Observable.of(1,2,3,4).reduce(0) { a, b in
return a+b
}.subscribe { element in
print(element)
}.disposed(by: bag)
// 输出结果
// next(10)
// completed
shareReplay
使观察者共享 Observable ,观察者会立即收到最新的元 素,即使这些元素是在订阅前产生的
shar eReplay 操作符将使得观察者共享源 Observable ,并且 缓存最新的 n 个元素,将这些元素直接发送给新的观察者。
let seq = PublishSubject<Int>()
let ob = seq.map { (i) -> Int in
print("map 被调用 :---\(i)")
return i * 2
}
let _ = ob.subscribe(onNext: { (num) in
print("--第一次订阅--\(num)")
}, onError: nil, onCompleted: nil, onDisposed: nil)
let _ = ob.subscribe(onNext: { (num) in
print("--第二次订阅--\(num)")
}, onError: nil, onCompleted: nil, onDisposed: nil)
seq.onNext(1)
输出结果
map 被调用 :---1
--第一次订阅--2
map 被调用 :---1
--第二次订阅--2
这里可以看出来map
被调用了两次,我们再看看加了share的情况
let seq = PublishSubject<Int>()
let ob = seq.map { (i) -> Int in
print("map 被调用 :---\(i)")
return i * 2
}
.share(replay: 0, scope: .forever)
let _ = ob.subscribe(onNext: { (num) in
print("--第一次订阅--\(num)")
}, onError: nil, onCompleted: nil, onDisposed: nil)
let _ = ob.subscribe(onNext: { (num) in
print("--第二次订阅--\(num)")
}, onError: nil, onCompleted: nil, onDisposed: nil)
seq.onNext(1)
输出结果
map 被调用 :---1
--第一次订阅--2
--第二次订阅--2
总结:
从上面例子可以知道:
发送一次onNext
事件,就对应一个Observable
,而所有的观察者
都只对应这个Observable,Observable共享给所有的观察者
share(replay:scope:)
操作符使得观察者共享Observable,并且缓存最新的n个元素,将这些元素直接发送给新的观察者
用例2
无share
let net = Observable<String>.create { observe in
print("开始网络请求")
observe.onNext("Response__Result")
observe.onCompleted()
return Disposables.create()
}
net.subscribe { element in
print("第一次订阅\(element)",Thread.current)
}.disposed(by: bag)
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
net.subscribe { element in
print("第二次订阅\(element)",Thread.current)
}.disposed(by: self.bag)
}
输出结果
开始网络请求
第一次订阅next(Response__Result) <_NSMainThread: 0x6000028dc700>{number = 1, name = main}
第一次订阅completed <_NSMainThread: 0x6000028dc700>{number = 1, name = main}
开始网络请求
第二次订阅next(Response__Result) <_NSMainThread: 0x6000028dc700>{number = 1, name = main}
第二次订阅completed <_NSMainThread: 0x6000028dc700>{number = 1, name = main}
分析:
从打印结果来看,没用share的,订阅了两次,就触发了两次next事件
有share
let net = Observable<String>.create { observe in
print("开始网络请求")
observe.onNext("Response__Result")
observe.onCompleted()
return Disposables.create()
}.share(replay: 0, scope: .forever)
net.subscribe { element in
print("第一次订阅\(element)",Thread.current)
}.disposed(by: bag)
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
net.subscribe { element in
print("第二次订阅\(element)",Thread.current)
}.disposed(by: self.bag)
}
输出结果
开始网络请求
第一次订阅next(Response__Result) <_NSMainThread: 0x600001dd4740>{number = 1, name = main}
第一次订阅completed <_NSMainThread: 0x600001dd4740>{number = 1, name = main}
第二次订阅completed <_NSMainThread: 0x600001dd4740>{number = 1, name = main}
分析:
从打印结果来看,只执行了一次Next事件,相当于多个观察者共享一个Observable,也只响应这个Observable
single
限制 Observable 只有一个元素,否出发出一个 error事件
single 操作符将限制 Observable 只产生一个元素。如果Observable 只有一个元素,它将镜像这个 Observable 。 如果 Observable 没有元素或者元素数量大于一,它将产生 一个 error 事件。
let ob = Observable<String>.create { observe in
observe.onNext("1")
observe.onNext("2")
return Disposables.create()
}.single()
ob.subscribe { element in
print(element)
}.disposed(by: bag)
ob.subscribe { element in
print(element)
}.disposed(by: bag)
输出结果
next(1)
error(Sequence contains more than one element.)
next(1)
error(Sequence contains more than one element.)
分析:
从上述代码可以看出来,我们发出了两个onNext
事件,但是后面的订阅者中可以看出打印结果,第一次是正常发出事件,当第二次还存在事件的时候,就强制发出了error,结束了该序列
skip
跳过 Observable 中头 n 个元素
Observable.of(1,2,3,4,5,6,7)
.skip(2)
.subscribe { element in
switch element {
case .next(let index):
print(index)
default:
break
}
}
.disposed(by: bag)
输出结果
3
4
5
6
7
skipUntil
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence.skip(until: referenceSequence)
.subscribe { element in
print(element)
}.disposed(by: bag)
sourceSequence.onNext("1")
sourceSequence.onNext("2")
sourceSequence.onNext("3")
referenceSequence.onNext("4")
//
sourceSequence.onNext("5")
sourceSequence.onNext("6")
sourceSequence.onNext("7")
输出结果
next(5)
next(6)
next(7)
总结
:这里有一个注意点,如果你不执行referenceSequence.onNext("4")
,那么sourceSequence
所有的onNext事件将都不会响应
,也就是不会有任何输出结果
skipWhile
Observable<Int>.of(1,2,5,3,2,4,3,2,1)
.skip { element in
return element < 4
}
.subscribe { element in
print(element)
}
.disposed(by: bag)
输出结果
next(5)
next(3)
next(2)
next(4)
next(3)
next(2)
next(1)
completed
分析
:程序中,我们设定了element < 4
的条件,第一个元素是1,小于4,成立,第二个元素是2,小于4程序,第三个是5,并不小于4,返回false,而skipWhile的功能就是,直到遇到第一个false的条件
,那么前面所有的元素都将被丢弃
,从打印结果来看,开头的1,2是没有打印的,便可印证
startWith
将一些元素插入到序列的头部
startWith
操作符会在 Observable 头部插入一些元素。 (如果你想在尾部加入一些元素可以用concat
)
Observable<Int>.of(1,2,3,4)
.startWith(6,5)
.startWith(7)
.startWith(8)
.subscribe { element in
print(element)
}
.disposed(by: bag)
输出结果
next(8)
next(7)
next(6)
next(5)
next(1)
next(2)
next(3)
next(4)
completed
subscribeOn
指定 Observable 在那个 Scheduler 执行
ReactiveX 使用 Scheduler 来让 Observable 支持多线程。你可以使用 subscribeOn
操作符,来指示 Observable 在哪个Scheduler 执行。
observeOn
操作符非常相似。它指示 Observable 在哪个Scheduler 发出通知。
PublishSubject<Int>.of(1,2,3).subscribe(on: MainScheduler.instance)
.subscribe { element in
print(Thread.current)
print(element)
}.disposed(by: bag)
输出结果
<_NSMainThread: 0x6000030a4700>{number = 1, name = main}
next(1)
<_NSMainThread: 0x6000030a4700>{number = 1, name = main}
next(2)
<_NSMainThread: 0x6000030a4700>{number = 1, name = main}
next(3)
<_NSMainThread: 0x6000030a4700>{number = 1, name = main}
completed
take
仅仅从 Observable 中发出头 n 个元素
通过 take 操作符你可以只发出头 n 个元素。并且忽略掉后面的元素,直接结束序列。
PublishSubject<Int>.of(1,2,3,4,5).subscribe(on: MainScheduler.instance)
.take(2)
.subscribe { element in
print(Thread.current)
print(element)
}.disposed(by: bag)
输出结果
<_NSMainThread: 0x6000027d0900>{number = 1, name = main}
next(1)
<_NSMainThread: 0x6000027d0900>{number = 1, name = main}
next(2)
<_NSMainThread: 0x6000027d0900>{number = 1, name = main}
completed
takeLast
仅仅从 Observable 中发出尾部 n 个元素
通过 takeLast 操作符你可以只发出尾部 n 个元素。并且忽略 掉前面的元素。
PublishSubject<Int>.of(1,2,3,4,5).subscribe(on: MainScheduler.instance)
.takeLast(2)
.subscribe { element in
print(Thread.current)
print(element)
}.disposed(by: bag)
输出结果
<_NSMainThread: 0x6000025dc440>{number = 1, name = main}
next(4)
<_NSMainThread: 0x6000025dc440>{number = 1, name = main}
next(5)
<_NSMainThread: 0x6000025dc440>{number = 1, name = main}
completed
takeUntil
忽略掉在第二个 Observable 产生事件后发出的那部分元素
takeUntil 操作符将镜像源 Observable ,它同时观测第二个
Observable 。一旦第二个 Observable 发出一个元素或者 产生一个终止事件,那个镜像的 Observable 将立即终止。
let soucecSequence = PublishSubject<Int>()
let referenceSequence = PublishSubject<Int>()
soucecSequence.take(until: referenceSequence)
.subscribe { element in
print(element)
}
soucecSequence.onNext(1)
soucecSequence.onNext(2)
soucecSequence.onNext(3)
referenceSequence.onNext(4)
soucecSequence.onNext(5)
soucecSequence.onNext(6)
soucecSequence.onNext(7)
输出结果
next(1)
next(2)
next(3)
completed
takeWhile
镜像一个 Observable 直到某个元素的判定为 false
takeWhile 操作符将镜像源 Observable 直到某个元素的判 定为 false。此时,这个镜像的 Observable 将立即终止。
PublishSubject<Int>.of(1,2,3,4,5,6,7)
.take(while: { index in
return index < 4
})
.subscribe {
print($0)
}.disposed(by: bag)
输出结果
next(1)
next(2)
next(3)
completed
timeout
设置超时时间,超过规定时间的事件将发送 error
let pub = PublishSubject<Int>()
pub.timeout(RxTimeInterval.seconds(3), scheduler: MainScheduler.instance).subscribe { element in
print(element)
}.disposed(by: bag)
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 4) {
pub.onNext(3)
}
timer
创建一个 Observable 在一段延时后,产生唯一的一个元素
timer 操作符将创建一个 Observable ,它在经过设定的一段 时间后,产生唯一的一个元素。
public static func timer( _ dueTime: RxTimeInterval, // 初始延时
period: RxTimeInterval?, // 时间间隔
scheduler: SchedulerType
) -> Observable<E>
PublishSubject<Int>
.timer(RxTimeInterval.seconds(2), period: RxTimeInterval.seconds(3), scheduler: MainScheduler.instance)
.subscribe { element in
print(element)
}.disposed(by: bag)