RxSwift #03 | Subjects

2021-12-05  本文已影响0人  JeremyTechBlog

Observable 是 RxSwift 的基础,但它们本质上是只读(read-only)的。你只能通过订阅 observable,来获得它们产生的新事件的通知。

举个栗子:

Observable.create { observer in
    cache.calculateDiskStorageSize { result in
        switch result {
        case .success(let size):
            observer.onNext(Int64(size))
    case .failure(let error):
            observer.error(error)
      log.error("calculate cache failed with error:\\(error)")
    }
        observer.onCompleted()
    }
    return Disposables.create()
}

从上述代码中可以看出,Observable 在创建的时候,就已经确定了通过某种固定的逻辑,去发出事件,从而产生事件流。

上面这个例子中的固定逻辑,指的就是计算 cache 大小。通过 cache 计算结果,来决定发出 onNext , error 或者 completable 事件。

而在日常开发中,我们通常需要根据不同的逻辑,来决定发出事件。抽象来说,就是需要一个既能作为 Observable 又能作为 Observer 的东西。这种东西称为 Subject:

let subject = PublishSubject<String>()

subject.on(.next("Is anyone listening?"))

let subscriptionOne = subject
  .subscribe(onNext: { string in
    print(string)
  })

上述代码创建了一个 PublishSubject, 它的名字很贴切:就像一个报纸出版商一样,它接收信息,然后发布给订阅者。

执行上述代码,会发现控制台中没有打印任何东西,这是因为:PublishSubject 只给当前的订阅者发出事件,如果一个 observer 是在事件发出之后才订阅的,那么将不会收到任何事件。

什么是 Subject?

Subject 既是一个 observable, 又是一个 observer。在上面的例子中,可以看到 subject 既可以接收事件,又可以被订阅。

Observable 和 Subject 的区别,除了 Subject 既可以作为 Observable,又可以作为 Observer以外,也可以这么理解:

Observable 已经把各种事件都定好了,比如发送网络请求,然后 Observer 在 subscribe 的时候,就触发这个网络请求,然后发送各种事件。

Subject 则是事件没有定好,可以灵活地根据业务需求去进行触发,比如选择相片,比如发送网络请求,然后发送各种事件。

在 RxSwift 中,有四种类型的 subject:

RxSwift 中还提供了一种叫做 Relay 的概念(在使用是需要 import RxRelay),RxSwift 中提供了两种 relay:

这两种 relay 包含着对应的 subject, 但只能接收和转发 next 事件,不能添加 completed 或者 error 事件,所以它们对于非终止序列来说是非常友好的。

Relay 只能 accept 事件,不能发送 completed 或者 error 等终止事件,因此没有结束的概念。如果要正确释放 replay, 需要把它添加到 disposeBag 中。

使用 PublishSubject

let subject = PublishSubject<String>()

subject.on(.next("Is anyone listening?"))

let subscriptionOne = subject
  .subscribe(onNext: { string in
    print(string)
  })

let subscriptionTwo = subject
  .subscribe { event in
    print("2)", event.element ?? event)
  }

subject.onNext("3")

/**
output:
3
2) 3
**/

subscriptionOne.dispose()
subject.onNext("4")

/**
output:
2) 4
**/

当一个 publish subject 收到一个 completed 或 errror 事件,也就是终止事件,它将向新的订阅者发出该终止事件,它将不再发出 next 事件。

而且,它将向后来的订阅者重新发出其终止事件。(Subject 被终止后,如果还有 observer 去 subscribe 它,那么 subject 会重复给这些 observers 发送终止事件

// 1
subject.onCompleted()

// 2
subject.onNext("5")

// 3
subscriptionTwo.dispose()

let disposeBag = DisposeBag()

// 4
subject
  .subscribe {
    print("3)", $0.element ?? $0)
  }
  .disposed(by: disposeBag)

subject.onNext("?")

/**
output:
2) completed
3) completed
**/

使用 BehaviorSubject

顶部的第一行是 subject。

第二行的第一个订阅者在 1 之后但在 2 之前订阅,所以它在订阅后立即收到 1,然后在主体发出 2 和 3 的时候收到。

同样地,第二个订阅者在 2 之后但在 3 之前订阅,所以它在订阅后立即收到 2,然后在 3 被发出时收到。

// 1
enum MyError: Error {
  case anError
}

// 2
func print<T: CustomStringConvertible>(label: String, event: Event<T>) {
  print(label, (event.element ?? event.error) ?? event)
}

// 3
example(of: "BehaviorSubject") {
  // 4
  let subject = BehaviorSubject(value: "Initial value")
  let disposeBag = DisposeBag()

    subject
      .subscribe {
        print(label: "1)", event: $0)
      }
      .disposed(by: disposeBag

    /**
    1) Initial value
    **/

    subject.onNext("X")

    /**
    1) X
    **/

    // 1
    subject.onError(MyError.anError)

    // 2
    subject
      .subscribe {
        print(label: "2)", event: $0)
      }
      .disposed(by: disposeBag)

    /**
    1) anError
    2) anError
    **/
}

BehaviorSubject 向新的订阅者重放他们的最新值。这使得它们很适合用来模拟各种状态的转移,比如“请求正在加载中”→“请求完成”。

那如果想要显示比上一个值还更多的内容呢,比如在搜索框上,需要显示最近使用的五个搜索值,这个时候就要用到 ReplaySubject 了。

使用 ReplaySubject

ReplaySubject 将暂时缓存、或缓冲他们发出的最新元素,直到达到你选择的指定大小。然后,他们将向新的订阅者重新发出该缓冲区内的元素。

下面的大理石图描述了一个缓冲区大小为2的重放主体:

第一个订阅者(中间一行)已经订阅了 replay subject(最上面一行),所以它在元素被发射出来的时候得到了元素。第二个订阅者(底线)在 2 之后订阅了,所以它得到了 1 和 2 的重放。

请记住,当使用一个 replay subject 时,这个缓冲区是在内存中保存的,所以很有可能会导致太高的内存占用。比如你为某种类型的 replay subject 设置一个大的缓冲区大小,而这种类型的实例都会占用大量的内存,比如图像。

另一件需要注意的事情是创建一个数组类型的 replay subject。每个发射的元素将是一个数组,所以缓冲区的大小将缓冲那么多数组。如果不小心的话,也很容易在这里产生内存压力。

example(of: "ReplaySubject") {
  // 1
  let subject = ReplaySubject<String>.create(bufferSize: 2)
  let disposeBag = DisposeBag()

  // 2
  subject.onNext("1")
  subject.onNext("2")
  subject.onNext("3")

  // 3
  subject
    .subscribe {
      print(label: "1)", event: $0)
    }
    .disposed(by: disposeBag)

  subject
    .subscribe {
      print(label: "2)", event: $0)
    }
    .disposed(by: disposeBag)

/**
--- Example of: ReplaySubject ---
1) 2
1) 3
2) 2
2) 3
**/

    subject.onNext("4")
    subject.onError(MyError.anError)

    subject
      .subscribe {
        print(label: "3)", event: $0)
      }
      .disposed(by: disposeBag)

/**
前两个订阅者将正常接收当前元素,因为当新元素被添加到主题时,他们已经被订阅了,而新的第三个订阅者将得到最后两个缓冲的元素重放给它。
虽然最后订阅流中发出了一个 error 事件,但是缓冲区还在内存中,所以它还会把缓冲区之前的元素发给订阅者。

1) 4
2) 4
1) anError
2) anError
3) 3
3) 4
3) anError
**/
    }

subject.dispose()

// 因为 subject 在前面已经发出了 error 事件,所以它被终止并且释放了,这里再调用 dispose 会报错
// 3) Object `RxSwift...ReplayMany<Swift.String>` was already disposed.

使用 Relay

在前面的介绍中,我们知道:Relay 实际上是对应 Subject 的一层封装——PublishRelay 是 PublishSubject 的封装,BehaviorRelay 是 BehaviorSubject 的封装。它和 Subject 不一样的地方在于:它只能通过 accept(_:) 方法接收并发出事件,它不能使用 onNext(_:) 发出事件,也不能使用 onCompleted() 或者 onError(_:) 去终止订阅流,因此,Relay 保证了永远不会终止。

let relay = PublishRelay<String>()  
let disposeBag = DisposeBag()
relay.accept("Knock knock, anyone home?")
relay
  .subscribe(onNext: {
    print($0)
  })
  .disposed(by: disposeBag)

relay.accept("1")

// output: 1

relay.accept(MyError.anError)
relay.onCompleted()
// compile error

上一篇下一篇

猜你喜欢

热点阅读