RxSwift #03 | Subjects
Observable 是 RxSwift 的基础,但它们本质上是只读(read-only)的。你只能通过订阅 observable,来获得它们产生的新事件的通知。
举个栗子:
Observable.create { observer in
cache.calculateDiskStorageSize { result in
switch result {
case .success(let size):
observer.onNext(Int64(size))
case .failure(let error):
observer.error(error)
log.error("calculate cache failed with error:\\(error)")
}
observer.onCompleted()
}
return Disposables.create()
}
从上述代码中可以看出,Observable 在创建的时候,就已经确定了通过某种固定的逻辑,去发出事件,从而产生事件流。
上面这个例子中的固定逻辑,指的就是计算 cache 大小。通过 cache 计算结果,来决定发出 onNext
, error
或者 completable
事件。
而在日常开发中,我们通常需要根据不同的逻辑,来决定发出事件。抽象来说,就是需要一个既能作为 Observable 又能作为 Observer 的东西。这种东西称为 Subject:
let subject = PublishSubject<String>()
subject.on(.next("Is anyone listening?"))
let subscriptionOne = subject
.subscribe(onNext: { string in
print(string)
})
上述代码创建了一个 PublishSubject, 它的名字很贴切:就像一个报纸出版商一样,它接收信息,然后发布给订阅者。
执行上述代码,会发现控制台中没有打印任何东西,这是因为:PublishSubject 只给当前的订阅者发出事件,如果一个 observer 是在事件发出之后才订阅的,那么将不会收到任何事件。
什么是 Subject?
Subject 既是一个 observable, 又是一个 observer。在上面的例子中,可以看到 subject 既可以接收事件,又可以被订阅。
Observable 和 Subject 的区别,除了 Subject 既可以作为 Observable,又可以作为 Observer以外,也可以这么理解:
Observable 已经把各种事件都定好了,比如发送网络请求,然后 Observer 在 subscribe 的时候,就触发这个网络请求,然后发送各种事件。
Subject 则是事件没有定好,可以灵活地根据业务需求去进行触发,比如选择相片,比如发送网络请求,然后发送各种事件。
在 RxSwift 中,有四种类型的 subject:
- PublishSubject: 初始时是空的,只向订阅者发出新元素。
- BehaviorSubject: 有一个初始值,并将其初始值和最新的元素发送给新的订阅者。
- ReplaySubject: 初始化时需要有一个缓冲区大小,并将维持一个该大小的缓冲区,缓冲区内的元素都会发送给新的订阅者。
- AsyncSubject: 只发出序列中的最后一个 next 事件,并且只在 subject 接收到 completed 事件时才发出。这是一种很少使用的 subject。
RxSwift 中还提供了一种叫做 Relay 的概念(在使用是需要 import RxRelay
),RxSwift 中提供了两种 relay:
- PublishRelay
- BehaviorRelay
这两种 relay 包含着对应的 subject, 但只能接收和转发 next 事件,不能添加 completed 或者 error 事件,所以它们对于非终止序列来说是非常友好的。
Relay 只能 accept 事件,不能发送 completed 或者 error 等终止事件,因此没有结束的概念。如果要正确释放 replay, 需要把它添加到 disposeBag 中。
使用 PublishSubject
let subject = PublishSubject<String>()
subject.on(.next("Is anyone listening?"))
let subscriptionOne = subject
.subscribe(onNext: { string in
print(string)
})
let subscriptionTwo = subject
.subscribe { event in
print("2)", event.element ?? event)
}
subject.onNext("3")
/**
output:
3
2) 3
**/
subscriptionOne.dispose()
subject.onNext("4")
/**
output:
2) 4
**/
当一个 publish subject 收到一个 completed 或 errror 事件,也就是终止事件,它将向新的订阅者发出该终止事件,它将不再发出 next 事件。
而且,它将向后来的订阅者重新发出其终止事件。(Subject 被终止后,如果还有 observer 去 subscribe 它,那么 subject 会重复给这些 observers 发送终止事件)
// 1
subject.onCompleted()
// 2
subject.onNext("5")
// 3
subscriptionTwo.dispose()
let disposeBag = DisposeBag()
// 4
subject
.subscribe {
print("3)", $0.element ?? $0)
}
.disposed(by: disposeBag)
subject.onNext("?")
/**
output:
2) completed
3) completed
**/
使用 BehaviorSubject
顶部的第一行是 subject。
第二行的第一个订阅者在 1 之后但在 2 之前订阅,所以它在订阅后立即收到 1,然后在主体发出 2 和 3 的时候收到。
同样地,第二个订阅者在 2 之后但在 3 之前订阅,所以它在订阅后立即收到 2,然后在 3 被发出时收到。
// 1
enum MyError: Error {
case anError
}
// 2
func print<T: CustomStringConvertible>(label: String, event: Event<T>) {
print(label, (event.element ?? event.error) ?? event)
}
// 3
example(of: "BehaviorSubject") {
// 4
let subject = BehaviorSubject(value: "Initial value")
let disposeBag = DisposeBag()
subject
.subscribe {
print(label: "1)", event: $0)
}
.disposed(by: disposeBag
/**
1) Initial value
**/
subject.onNext("X")
/**
1) X
**/
// 1
subject.onError(MyError.anError)
// 2
subject
.subscribe {
print(label: "2)", event: $0)
}
.disposed(by: disposeBag)
/**
1) anError
2) anError
**/
}
BehaviorSubject 向新的订阅者重放他们的最新值。这使得它们很适合用来模拟各种状态的转移,比如“请求正在加载中”→“请求完成”。
那如果想要显示比上一个值还更多的内容呢,比如在搜索框上,需要显示最近使用的五个搜索值,这个时候就要用到 ReplaySubject 了。
使用 ReplaySubject
ReplaySubject 将暂时缓存、或缓冲他们发出的最新元素,直到达到你选择的指定大小。然后,他们将向新的订阅者重新发出该缓冲区内的元素。
下面的大理石图描述了一个缓冲区大小为2的重放主体:
第一个订阅者(中间一行)已经订阅了 replay subject(最上面一行),所以它在元素被发射出来的时候得到了元素。第二个订阅者(底线)在 2 之后订阅了,所以它得到了 1 和 2 的重放。
请记住,当使用一个 replay subject 时,这个缓冲区是在内存中保存的,所以很有可能会导致太高的内存占用。比如你为某种类型的 replay subject 设置一个大的缓冲区大小,而这种类型的实例都会占用大量的内存,比如图像。
另一件需要注意的事情是创建一个数组类型的 replay subject。每个发射的元素将是一个数组,所以缓冲区的大小将缓冲那么多数组。如果不小心的话,也很容易在这里产生内存压力。
example(of: "ReplaySubject") {
// 1
let subject = ReplaySubject<String>.create(bufferSize: 2)
let disposeBag = DisposeBag()
// 2
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
// 3
subject
.subscribe {
print(label: "1)", event: $0)
}
.disposed(by: disposeBag)
subject
.subscribe {
print(label: "2)", event: $0)
}
.disposed(by: disposeBag)
/**
--- Example of: ReplaySubject ---
1) 2
1) 3
2) 2
2) 3
**/
subject.onNext("4")
subject.onError(MyError.anError)
subject
.subscribe {
print(label: "3)", event: $0)
}
.disposed(by: disposeBag)
/**
前两个订阅者将正常接收当前元素,因为当新元素被添加到主题时,他们已经被订阅了,而新的第三个订阅者将得到最后两个缓冲的元素重放给它。
虽然最后订阅流中发出了一个 error 事件,但是缓冲区还在内存中,所以它还会把缓冲区之前的元素发给订阅者。
1) 4
2) 4
1) anError
2) anError
3) 3
3) 4
3) anError
**/
}
subject.dispose()
// 因为 subject 在前面已经发出了 error 事件,所以它被终止并且释放了,这里再调用 dispose 会报错
// 3) Object `RxSwift...ReplayMany<Swift.String>` was already disposed.
使用 Relay
在前面的介绍中,我们知道:Relay 实际上是对应 Subject 的一层封装——PublishRelay 是 PublishSubject 的封装,BehaviorRelay 是 BehaviorSubject 的封装。它和 Subject 不一样的地方在于:它只能通过 accept(_:)
方法接收并发出事件,它不能使用 onNext(_:)
发出事件,也不能使用 onCompleted()
或者 onError(_:)
去终止订阅流,因此,Relay 保证了永远不会终止。
let relay = PublishRelay<String>()
let disposeBag = DisposeBag()
relay.accept("Knock knock, anyone home?")
relay
.subscribe(onNext: {
print($0)
})
.disposed(by: disposeBag)
relay.accept("1")
// output: 1
relay.accept(MyError.anError)
relay.onCompleted()
// compile error