RxSwift(三)----Operators

2018-12-27  本文已影响0人  会笑的Even

RxSwift(三)----Operators

RxSwift中,我们需要对observables发出的事件进行转换,处理和响应,这里我们可以通过操作符来完成(Operators).

过滤操作符

ignoreElements

ignoreElements会忽略所有.next事件,通过停止事件.completed.error事件,

Snip20181211_2.png
let obser = PublishSubject<String>()
let bag = DisposeBag()
obser.ignoreElements().subscribe { event in
    print(event)
}
    .disposed(by: bag)
    
obser.onNext("1")
obser.onNext("2")
obser.onNext("3")
obser.onCompleted()

输出如下:

completed

elementAt

elementAt()输出指定位置的元素.

Snip20181212_3.png
let obser = PublishSubject<String>()
let bag = DisposeBag()
obser.elementAt(2).subscribe { event in
    print(event)
    }
    .disposed(by: bag)
obser.onNext("1")
obser.onNext("2")
obser.onNext("3")
obser.onCompleted()

输出如下:

next(3)
completed

filter

filter过滤指定条件的元素.

Snip20181212_4.png
let bag = DisposeBag()
    
Observable.of(1,2,3,4,5,6)
.filter {
    $0 % 2 == 0
}
.subscribe(onNext: {
    print($0)
})
.disposed(by: bag)

输出如下:

2
4
6

skip

skip跳过指定个数元素.

Snip20181212_5.png
let bag = DisposeBag()
    
Observable.of(1,2,3,4,5,6)
.skip(3)
.subscribe(onNext: {
    print($0)
})
.disposed(by: bag)

输出如下:

4
5
6

skipWhile

skipWhile满足条件之前的元素全部跳过,之后全部通过.

Snip20181212_6.png
let bag = DisposeBag()
    
/// skipWhile直到没有元素被跳过
Observable.of(1,1,2,3,4,5,6)
.skipWhile({
    $0 % 2 == 1
})
.subscribe(onNext: {
    print($0)
})
.disposed(by: bag)

输入如下:

2
3
4
5
6

skipUntil

skipUntil在另外一个序列发出事件之前,原序列的元素被跳过.

Snip20181212_7.png
let bag = DisposeBag()
    
/// skipUntil直到另外一个observeable发出.next事件之前,元素都是被跳过
let trigger = PublishSubject<String>()
let subject = PublishSubject<String>()
subject
.skipUntil(trigger)
.subscribe(onNext: {
    print($0)
})
.disposed(by: bag)
    
subject.onNext("1")
subject.onNext("2")
    
trigger.onNext("X")
subject.onNext("3")

输出如下:

3

take

take去序列前面指定数量的元素.

Snip20181212_8.png
let bag = DisposeBag()
    

Observable.of(1,1,2,3,4,5,6)
.take(3)
.subscribe(onNext: {
    print($0)
})
.disposed(by: bag)

输出如下:

1
1
2

takeWhile

takeWhile取前面的元素直到条件不成立为止.与skipWhile相反.

Snip20181212_9.png
let bag = DisposeBag()
Observable.of(1,1,2,3,4,5,6)
.enumerated()
.takeWhile({ index, integer in
    index < 2 && integer < 3
})
.map { $0.element }
.subscribe(onNext: {
    print($0)
})
.disposed(by: bag)

输出如下:

1
1

takeUntil

takeUntil直到另外一个序列发出事件之前,取相应的元素.

Snip20181212_11.png
let disposeBag = DisposeBag()
// 1
let subject = PublishSubject<String>()
let trigger = PublishSubject<String>()
// 2
subject
    .takeUntil(trigger)
    .subscribe(onNext: {
        print($0) })
    .disposed(by: disposeBag)
// 3
subject.onNext("1")
subject.onNext("2")
trigger.onNext("X")
subject.onNext("3")

输出如下:

1
2

takeLast

takeLast取尾部特定数量的元素.

Snip20181212_10.png
let bag = DisposeBag()
Observable.of(1,1,2,3,4,5,6)
.takeLast(3)
.subscribe(onNext: {
    print($0)
})
.disposed(by: bag)

输出如下:

4
5
6

distinctUntilChanged

distinctUntilChanged过滤掉重复的元素.

Snip20181212_12.png
Observable.of(1,1,2,3,4,5,6)
.distinctUntilChanged()
.subscribe(onNext: {
    print($0)
})
.disposed(by: bag)

输出如下:

1
2
3
4
5
6

distinctUntilChanged(_:)

distinctUntilChanged(_:)可以自定义判断条件.

Snip20181212_13.png
let bag = DisposeBag()
Observable.of(1,1,2,3,4,5,6)
.distinctUntilChanged({ (a, b) -> Bool in
    a < b
})
.subscribe(onNext: {
    print($0)
})
.disposed(by: bag)

输出如下:

1
1

take(_, scheduler:)

take(_, scheduler:)取出序列开始发出元素时间段内的元素.

let pub = PublishSubject<Int>()
pub.asObservable()
    .take(5, scheduler: MainScheduler.instance)
    .subscribe { s in
        print(s)
}
    .disposed(by: bag)
pub.onNext(1)
pub.onNext(2)
    
// 五秒后执行
pub.onNext(3)

打印如下:

next(1)
next(2)

throttle(_, latest, scheduler)

throttle(_, latest, scheduler)序列发出的元素的元素和下一个发出元素必须在一定时间间隔之外,不认就会被过滤掉, latest如果为true,表示最新的元素如果在时间间隔之外序列没有发出新的元素,这个最新元素也会发出来.

pub = PublishSubject<Int>()
pub.asObservable()
    .throttle(3, latest: false, scheduler: MainScheduler.instance)
    .subscribe { s in
        print(s)
}
    .disposed(by: bag)
pub.onNext(1)
pub.onNext(2)
    
// 4秒钟之后执行
pub.onNext(3)
pub.onNext(3)
pub.onNext(3)

输出结果如下:

next(1)
next(3)

转换操作符

toArray

toArray元素以数组的形式的发出来.

Snip20181213_14.png
let disposeBag = DisposeBag()
// 1
Observable.of("A", "B", "C")
// 2
.toArray()
.subscribe(onNext: {
    print($0)
    
})
.disposed(by: disposeBag)

输出如下:

["A", "B", "C"]

map

map对,每个元素进行map操作.

Snip20181213_15.png
let disposeBag = DisposeBag()
// 1
let formatter = NumberFormatter()
formatter.numberStyle = .spellOut
// 2
Observable<NSNumber>.of(123, 4, 56)
// 3
.map {
    formatter.string(from: $0) ?? ""
}
.subscribe(onNext: {
    print($0)
})
.disposed(by: disposeBag)

输出如下:

one hundred twenty-three
four
fifty-six

flatMap

flatMap会将元素单独当做序列,最后将序列中的元素取出来,合并成一个一个序列.

Snip20181213_16.png
let bag = DisposeBag()
    
let s1 =  BehaviorSubject(value: 80)
let s2 = BehaviorSubject(value: 90)
    
let student = PublishSubject<BehaviorSubject<Int>>()
student.flatMap {
    $0
}
    .subscribe(onNext: { value in
        print(value)
    })
    .disposed(by: bag)
   
student.onNext(s1)
s1.onNext(85)
student.onNext(s2)
s1.onNext(95)
s2.onNext(100)

输出如下:

80
85
90
95
100

flatMapLatest

flatMapLatest将元素转换成新的序列,只将最新的序列的元素合并起来组成新的序列.

Snip20181213_17.png
let bag = DisposeBag()
    
let s1 =  BehaviorSubject(value: 80)
let s2 = BehaviorSubject(value: 90)
    
let student = PublishSubject<BehaviorSubject<Int>>()
student.flatMapLatest {
    $0
}
    .subscribe(onNext: { value in
        print(value)
    })
    .disposed(by: bag)
   
student.onNext(s1)
s1.onNext(85)
student.onNext(s2)
s1.onNext(95)
s2.onNext(100)

输出如下:

80
85
90
100

materialize

materialize将元素包装成事件发送出来.

Snip20181213_18.png
let bag = DisposeBag()
    
let s1 =  BehaviorSubject(value: 80)
let s2 = BehaviorSubject(value: 90)
    
let student = PublishSubject<BehaviorSubject<Int>>()
student.flatMapLatest {
    $0.materialize()
}
    .subscribe(onNext: { value in
        print(value)
    })
    .disposed(by: bag)
   
student.onNext(s1)
s1.onNext(85)
student.onNext(s2)
s1.onNext(95)
s2.onNext(100)

输出如下:

next(80)
next(85)
next(90)
next(100)

dematerialize

dematerialize将被包装成事件的元素,直接以元素的形式发送出来.

Snip20181213_19.png
let bag = DisposeBag()
    
let s1 =  BehaviorSubject(value: 80)
let s2 = BehaviorSubject(value: 90)
    
let student = PublishSubject<BehaviorSubject<Int>>()
student.flatMapLatest {
    $0.materialize()
}
    .dematerialize()
    .subscribe(onNext: { value in
        print(value)
    })
    .disposed(by: bag)
   
student.onNext(s1)
s1.onNext(85)
student.onNext(s2)
s1.onNext(95)
s2.onNext(100)

输出如下:

80
85
90
100

share(replay, scope)

share(replay, scope)共享序列,将最新缓存指定个数的元素发送给新的订阅者.

Snip20181213_20.png
let bag = DisposeBag()
    
let observable = Observable.of(1,2,3,4,5).share(replay: 2, scope: .forever)
observable.subscribe(onNext: { value in
        print(value)
    })
    .disposed(by: bag)
    
observable.subscribe(onNext: { value in
    print(value)
})
    .disposed(by: bag)

输出结果:

1
2
3
4
5
4
5

组合操作符

startWith

startWith序列以什么元素开始.

Snip20181213_21.png
let bag = DisposeBag()
// 1
let numbers = Observable.of(2, 3, 4)
// 2
let observable = numbers.startWith(1)
observable.subscribe(onNext: { value in
    print(value)
})
    .disposed(by: bag)

输出如下:

1
2
3
4

concat

concat按顺序将序列连接起来.

Snip20181213_22.png
let bag = DisposeBag()
// 1
let first = Observable.of(1, 2, 3)
let second = Observable.of(4, 5, 6)
// 2
let observable = Observable.concat([first, second])
observable.subscribe(onNext: { value in
    print(value)
})
    .disposed(by: bag)

输出如下:

1
2
3
4
5
6

concatMap

concatMap将序列按顺序连接起来,并进行map操作.

let bag = DisposeBag()
// 1
let first = Observable.of(1, 2, 3)
let second = Observable.of(4, 5, 6)
// 2
let observable = Observable.of(first, second).concatMap { obs -> Observable<Int> in
    obs.map({ i in
        i * 2
    })
}
observable.subscribe(onNext: { value in
    print(value)
})
    .disposed(by: bag)

输出如下:

2
4
6
8
10
12

merge

merge将序列中的元素按顺序合并发出来.

Snip20181213_23.png
// 1
let left = PublishSubject<String>()
let right = PublishSubject<String>()
// 2
let source = Observable.of(left.asObservable(), right.asObservable())
    
// 3
let observable = source.merge()
let disposable = observable.subscribe(onNext: { value in
    print(value)
})
    
// 4
var leftValues = ["Berlin", "Munich", "Frankfurt"]
var rightValues = ["Madrid", "Barcelona", "Valencia"]
repeat {
    if arc4random_uniform(2) == 0 {
        if !leftValues.isEmpty {
            left.onNext("Left:  " + leftValues.removeFirst())
        }
    } else if !rightValues.isEmpty {
        right.onNext("Right: " + rightValues.removeFirst())
    }
} while !leftValues.isEmpty || !rightValues.isEmpty
// 5
disposable.dispose()

输出如下:

Left:  Berlin
Right: Madrid
Left:  Munich
Right: Barcelona
Left:  Frankfurt
Right: Valencia

combineLatest

combineLatest将每个序列中的最新元素组合发送出来.

Snip20181213_24.png
let left = PublishSubject<String>()
let right = PublishSubject<String>()
    
let observable = Observable.combineLatest(left, right, resultSelector:
{
    lastLeft, lastRight in
    "\(lastLeft) \(lastRight)"
})
let disposable = observable.subscribe(onNext: { value in
    print(value)
})
    
// 2
print("> Sending a value to Left")
left.onNext("Hello,")
print("> Sending a value to Right")
right.onNext("world")
print("> Sending another value to Right")
right.onNext("RxSwift")
print("> Sending another value to Left")
left.onNext("Have a good day,")
    
disposable.dispose()

输出如下:

> Sending a value to Left
> Sending a value to Right
Hello, world
> Sending another value to Right
Hello, RxSwift
> Sending another value to Left
Have a good day, RxSwift

zip

zip将各个序列中的元素一一对应的组合发送出来.

let left = PublishSubject<String>()
let right = PublishSubject<String>()
    
let observable = Observable.zip([left,right])
let disposable = observable.subscribe(onNext: { value in
    print(value)
})
// 2
left.onNext("Lisbon")
right.onNext("Copenhagen")
left.onNext("London")
left.onNext("Madrid")
right.onNext("Vienna")
disposable.dispose()

输出如下:

["Lisbon", "Copenhagen"]
["London", "Vienna"]

withLatestFrom

withLatestFrom当地一个序列发出元素,就将第二个序列的最新的元素发送出去.

Snip20181213_25.png
let bag = DisposeBag()
// 1
let button = PublishSubject<Void>()
let textField = PublishSubject<String>()
// 2
let observable = button.withLatestFrom(textField)
_ = observable.subscribe(onNext: { value in
    print(value)
})
    .disposed(by: bag)
// 3
textField.onNext("Par")
textField.onNext("Pari")
textField.onNext("Paris")
button.onNext(())
button.onNext(())

输出如下:

Paris
Paris

sample

samplewithLatestFrom区别就是,如果第一个没有最新发出的元素,就不会发出元素.

Snip20181213_26.png
let bag = DisposeBag()
// 1
let button = PublishSubject<Void>()
let textField = PublishSubject<String>()
// 2
let observable = textField.sample(button)
_ = observable.subscribe(onNext: { value in
    print(value)
})
    .disposed(by: bag)
// 3
textField.onNext("Par")
textField.onNext("Pari")
textField.onNext("Paris")
    
button.onNext(())
button.onNext(())

输出如下:

Paris

amb

amb取第一个发出元素的序列.

Snip20181213_27.png
let left = PublishSubject<String>()
let right = PublishSubject<String>()
let observable = left.amb(right)
let disposable = observable.subscribe(onNext: { value in
    print(value)
})
// 2
left.onNext("Lisbon")
right.onNext("Copenhagen")
left.onNext("London")
left.onNext("Madrid")
right.onNext("Vienna")
disposable.dispose()

输出如下:

Lisbon
London
Madrid

switchLatest

switchLatest取最新序列发出的元素.

Snip20181213_28.png
// 1
let one = PublishSubject<String>()
let two = PublishSubject<String>()
let three = PublishSubject<String>()
let source = PublishSubject<Observable<String>>()
    
// 2
let observable = source.switchLatest()
let disposable = observable.subscribe(onNext: { value in
    print(value)
})
    
// 3
source.onNext(one)
one.onNext("Some text from sequence one")
two.onNext("Some text from sequence two")
source.onNext(two)
two.onNext("More text from sequence two")
one.onNext("and also from sequence one")
source.onNext(three)
two.onNext("Why don't you see me?")
one.onNext("I'm alone, help me")
three.onNext("Hey it's three. I win.")
source.onNext(one)
one.onNext("Nope. It's me, one!")
    
disposable.dispose()

输出如下:

Some text from sequence one
More text from sequence two
Hey it's three. I win.
Nope. It's me, one!

reduce

reduce和集合中的reduce的用法相似.

Snip20181214_29.png
let bag = DisposeBag()
let source = Observable.of(1, 3, 5, 7, 9)
// 1
let observable = source.reduce(0, accumulator: +)
observable.subscribe(onNext: { value in
    print(value)
})
.disposed(by: bag)

输出如下:

25

scan

scanreduce相似,它会把每次执行的结果发送出来.

Snip20181214_30.png
let bag = DisposeBag()
let source = Observable.of(1, 3, 5, 7, 9)
let observable = source.scan(0, accumulator: +)
observable.subscribe(onNext: { value in
    print(value)
})
.disposed(by: bag)

输出如下:

1
4
9
16
25

基于时间的操作符

replay

replay序列被订阅后并不会马上发出元素,要序列被connect后才会发出元素,并将缓存的元素发送给新的订阅者.replayAll会将所有的元素发送给新的订阅者.

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .replay(5)
    
_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
    
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
    _ = intSequence.connect()
}
    
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
    
DispatchQueue.main.asyncAfter(deadline: .now() + 8) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

输出如下:

Subscription 1:, Event: 0
Subscription 2:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5

buffer(timeSpan:count:scheduler:)

buffer(timeSpan:count:scheduler:)将序列中,一定时间段内,一定数量的元素周期性的发送出去.

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .buffer(timeSpan: 4, count: 2, scheduler: MainScheduler.instance)
    
_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
    
    
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
    
DispatchQueue.main.asyncAfter(deadline: .now() + 8) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

输出如下:

Subscription 1:, Event: [0, 1]
Subscription 1:, Event: [2, 3]
Subscription 2:, Event: [0, 1]
Subscription 1:, Event: [4, 5]
Subscription 2:, Event: [2, 3]

window(timeSpan:count:scheduler:)

window(timeSpan:count:scheduler:)buffer(timeSpan:count:scheduler:)相似,只是结果不是以数组形式发出来,而是以序列的方式.

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .window(timeSpan: 4, count: 2, scheduler: MainScheduler.instance)
    
_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
    
    
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
    
DispatchQueue.main.asyncAfter(deadline: .now() + 8) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

结果如下:

Subscription 1:, Event: RxSwift.AddRef<Swift.Int>
Subscription 1:, Event: RxSwift.AddRef<Swift.Int>
Subscription 2:, Event: RxSwift.AddRef<Swift.Int>
Subscription 1:, Event: RxSwift.AddRef<Swift.Int>
Subscription 2:, Event: RxSwift.AddRef<Swift.Int>
Subscription 1:, Event: RxSwift.AddRef<Swift.Int>

delaySubscription

delaySubscription对序列的元素进行延迟订阅.

Snip20181214_3.png
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .delaySubscription(3, scheduler: MainScheduler.instance)
    
_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
    
    
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
    
DispatchQueue.main.asyncAfter(deadline: .now() + 8) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

输出如下:

Subscription 1:, Event: 0
Subscription 1:, Event: 1
Subscription 1:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 0

delay

delay将元素延迟一段时间在发送出来.

Snip20181214_5.png
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .delay(3, scheduler: MainScheduler.instance)
    
_ = intSequence
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })
    
    
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 2:, Event: \($0)") })
}
    
DispatchQueue.main.asyncAfter(deadline: .now() + 8) {
    _ = intSequence
        .subscribe(onNext: { print("Subscription 3:, Event: \($0)") })
}

输出如下:

Subscription 1:, Event: 0
Subscription 1:, Event: 1
Subscription 1:, Event: 2
Subscription 1:, Event: 3

interval(_:scheduler:)

interval(_:scheduler:)周期性发出事件

let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    .delay(3, scheduler: MainScheduler.instance)
    
_ = intSequence.map { $0 * 2 }
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })

输出如下:

Subscription 1:, Event: 0
Subscription 1:, Event: 2
Subscription 1:, Event: 4
Subscription 1:, Event: 6
Subscription 1:, Event: 8

timer(_:period:scheduler:)

timer(_:period:scheduler:)在延迟一段时间后,周期性的发出元素.

let intSequence = Observable<Int>.timer(2.0, period: 3.0, scheduler: MainScheduler.instance)
    
_ = intSequence.map { $0 * 2 }
    .subscribe(onNext: { print("Subscription 1:, Event: \($0)") })

输出如下:

Subscription 1:, Event: 0
Subscription 1:, Event: 2
Subscription 1:, Event: 4
Subscription 1:, Event: 6

timeout

timeout,如果 Observable 在一段时间内没有产生元素,timeout操作符将使它发出一个 error 事件。

//定义好每个事件里的值以及发送的时间
let times = [
[ "value": 1, "time": 0 ],
[ "value": 2, "time": 0.5 ],
[ "value": 3, "time": 1.5 ],
[ "value": 4, "time": 4 ],
[ "value": 5, "time": 5 ]
]
 
//生成对应的 Observable 序列并订阅
Observable.from(times)
.flatMap { item in
    return Observable.of(Int(item["value"]!))
        .delaySubscription(Double(item["time"]!),
                           scheduler: MainScheduler.instance)
}
.timeout(2, scheduler: MainScheduler.instance) //超过两秒没发出元素,则产生error事件
.subscribe(onNext: { element in
    print(element)
}, onError: { error in
    print(error)
})
.disposed(by: disposeBag)
上一篇下一篇

猜你喜欢

热点阅读