RXSwift:Observable介绍

2021-10-27  本文已影响0人  yyggzc521

Observable 这个类是 Rx 框架的基础,可以称它为可观察序列。
它的作用,可以异步地产生一系列的 Event(事件),即一个 Observable 对象会随着时间推移不定期地发出 event(element : T) 这样一个东西。
而且这些 Event 还可以携带数据,它的泛型就是用来指定这个 Event 携带的数据的类型。

有了可观察序列,还需要有一个 Observer(订阅者)来订阅它,这样订阅者才能收到 Observable 发出的 Event

Event

public enum Event<Element> {
    /// Next element is produced.
    case next(Element)
 
    /// Sequence terminated with an error.
    case error(Swift.Error)
 
    /// Sequence completed successfully.
    case completed
}

可以看出 Event 就是个枚举,也就是说一个 Observable 是可以发出 3 种不同类型的事件:

Observable的创建

  1. just() 方法

该方法通过传入一个默认值来初始化。指定了这个 Observable 所发出的事件携带的数据类型必须是 Int 类型的。

let observable = Observable<Int>.just(5)
  1. of() 方法
    该方法可以接受可变数量的参数(必需要是同类型的)
let observable = Observable.of("A", "B", "C")
  1. from() 方法
    该方法需要一个数组参数。下面样例中数据的元素会被当做这个 Observable 所发出 event 携带的数据内容,最终效果同上面饿 of() 样例是一样的。
let observable = Observable.from(["A", "B", "C"])
  1. empty() 方法
    该方法创建一个空内容的 Observable 序列
  2. never() 方法
    该方法创建一个永远不会发出 Event(也不会终止)的 Observable 序列
let observable = Observable<Int>.never()
  1. error() 方法
    该方法创建一个不做任何操作,而是直接发送一个错误的 Observable 序列
enum MyError: Error {
    case A
    case B
}
         
let observable = Observable<Int>.error(MyError.A)
  1. create() 方法
    该方法接受一个 block 形式的参数,任务是对每一个过来的订阅进行处理
//这个block有一个回调参数observer就是订阅这个Observable对象的订阅者
//当一个订阅者订阅这个Observable对象的时候,就会将订阅者作为参数传入这个block来执行一些内容
let observable = Observable<String>.create{observer in
    //对订阅者发出了.next事件,且携带了一个数据"hangge.com"
    observer.onNext("hangge.com")
    //对订阅者发出了.completed事件
    observer.onCompleted()
    //在结尾 returen 一个Disposable 来取消订阅
    return Disposables.create()
}
 
//订阅测试
observable.subscribe {
    print($0)
}

8. deferred() 方法
该方法相当于创建一个 Observable 工厂,通过传入一个 block 来执行延迟 Observable 序列创建的行为,而这个 block 里就是真正的实例化序列对象的地方
  1. interval() 方法
    这个方法创建的 Observable 序列每隔一段设定的时间,会发出一个索引数的元素。而且它会一直发送下去。
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}
  1. timer() 方法
    这个方法有两种用法,一种是创建的 Observable 序列在经过设定的一段时间后,产生唯一的一个元素
//5秒种后发出唯一的一个元素0
let observable = Observable<Int>.timer(5, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}

另一种是创建的 Observable 序列在经过设定的一段时间后,每隔一段时间产生一个元素

//延时5秒种后,每隔1秒钟发出一个元素
let observable = Observable<Int>.timer(5, period: 1, scheduler: MainScheduler.instance)
observable.subscribe { event in
    print(event)
}

Observable的订阅、监听

有了 Observable,我们还要订阅它,接收它发出的 Event

let observable = Observable.of("A", "B", "C")
observable.subscribe { event in
    print(event)
}

通过不同的 block 回调处理不同类型的 event。(其中 onDisposed 表示订阅行为被 dispose 后的回调)
同时会把 event 携带的数据直接解包出来作为参数,方便我们使用。

let observable = Observable.of("A", "B", "C")
         
observable.subscribe(onNext: { element in
    print(element)
}, onError: { error in
    print(error)
}, onCompleted: {
    print("completed")
}, onDisposed: {
    print("disposed")
})

subscribe() 方法的 onNext、onError、onCompleted 和 onDisposed 这四个回调 block 参数都是有默认值的,即它们都是可选的。所以我们也可以只处理 onNext 而不管其他的情况。

let observable = Observable.of("A", "B", "C")
         
observable.subscribe(onNext: { element in
    print(element)
})

事件监听

let observable = Observable.of("A", "B", "C")
 
observable
    .do(onNext: { element in
        print("Intercepted Next:", element)
    }, onError: { error in
        print("Intercepted Error:", error)
    }, onCompleted: {
        print("Intercepted Completed")
    }, onDispose: {
        print("Intercepted Disposed")
    })
    .subscribe(onNext: { element in
        print(element)
    }, onError: { error in
        print(error)
    }, onCompleted: {
        print("completed")
    }, onDisposed: {
        print("disposed")
    })

订阅销毁

  1. Observable 从创建到终结流程
    Observable 序列被创建出来后不会马上就被激活发出 Event,而是要等到被订阅了才会激活
    而 Observable 序列激活后,要一直等到它发出了 .error 或 .completed 的 event 后,才被终结。

  2. dispose()
    该方法可以手动取消一个订阅行为。
    如果订阅结束了不再需要了,可以调用 dispose() 方法,把订阅销毁掉,防止内存泄漏。
    当一个订阅被 dispose 了,之后 observable 如果再发出 event,这个已经 dispose 的订阅就收不到消息了

let observable = Observable.of("A", "B", "C")
         
//使用subscription常量存储这个订阅方法
let subscription = observable.subscribe { event in
    print(event)
}
         
//调用这个订阅的dispose()方法
subscription.dispose()
  1. DisposeBag
    除dispose() 方法外,我们更经常用到的是一个叫 DisposeBag 的对象来管理多个订阅行为的销毁
    我们可以把DisposeBag 对象看成一个垃圾袋,把用过的订阅行为都放进去。
    而这个 DisposeBag 会在自己快要 dealloc 时,对它里面的所有订阅行为调用 dispose() 方法
let disposeBag = DisposeBag()
         
//第1个Observable,及其订阅
let observable1 = Observable.of("A", "B", "C")
observable1.subscribe { event in
    print(event)
}.disposed(by: disposeBag)
 
//第2个Observable,及其订阅
let observable2 = Observable.of(1, 2, 3)
observable2.subscribe { event in
    print(event)
}.disposed(by: disposeBag)

观察者(Observer)

观察者的作用就是监听事件,然后对事件做出响应。或者说任何响应事件的行为都是观察者
如:

  1. 当我们点击按钮,弹出一个提示框。那么这个“弹出一个提示框”就是观察者
  2. 当我们请求一个远程的 json 数据后,将其打印出来。那么这个“打印 json 数据”就是观察者
1. 创建观察者
let observable = Observable.of("A", "B", "C")
          
observable.subscribe(onNext: { element in
    print(element)
}, onError: { error in
    print(error)
}, onCompleted: {
    print("completed")
})

//Observable序列(每隔1秒钟发出一个索引数)
        let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
 
        observable
            .map { "当前索引数:\($0 )"}
            .bind { [weak self](text) in
                //收到发出的索引数后显示到label上
                self?.label.text = text
            }
            .disposed(by: disposeBag)
  1. AnyObserver 可以用来描叙任意一种观察者,配合 subscribe 方法使用
//观察者
let observer: AnyObserver<String> = AnyObserver { (event) in
    switch event {
    case .next(let data):
        print(data)
    case .error(let error):
        print(error)
    case .completed:
        print("completed")
    }
}
 
let observable = Observable.of("A", "B", "C")
observable.subscribe(observer)
  1. 配合 bindTo 方法使用
 //观察者
        let observer: AnyObserver<String> = AnyObserver { [weak self] (event) in
            switch event {
            case .next(let text):
                //收到发出的索引数后显示到label上
                self?.label.text = text
            default:
                break
            }
        }
         
        //Observable序列(每隔1秒钟发出一个索引数)
        let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
        observable
            .map { "当前索引数:\($0 )"}
            .bind(to: observer)
            .disposed(by: disposeBag)
    }
  1. 不会处理错误事件,确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler)
  2. 一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。

Subjects

既是订阅者,也是 Observable
说它是订阅者,因为能够动态地接收新的值
说它是Observable,是因为当 Subjects 有了新的值后,就会通过 Event 将新值发出给所有的订阅者
一共有四种 Subjects,分别为:PublishSubject、BehaviorSubject、ReplaySubject、Variable

  1. PublishSubject
    PublishSubject 是最普通的 Subject,它不需要初始值就能创建。
    PublishSubject 的订阅者从他们开始订阅的时间点起,可以收到订阅后 Subject 发出的新 Event,而不会收到他们在订阅前已发出的 Event
let disposeBag = DisposeBag()
 
//创建一个PublishSubject
let subject = PublishSubject<String>()
 
//第1次订阅subject
subject.subscribe(onNext: { string in
    print("第1次订阅:", string)
}, onCompleted:{
    print("第1次订阅:onCompleted")
}).disposed(by: disposeBag)

//由于当前没有任何订阅者,所以这条信息不会输出到控制台
subject.onNext("111")
  1. BehaviorSubject
    BehaviorSubject 需要通过一个默认初始值来创建。
    当一个订阅者来订阅它的时候,这个订阅者会立即收到 BehaviorSubjects 上一个发出的 event。之后就跟正常的情况一样,它也会接收到 BehaviorSubject 之后发出的新的 event
let disposeBag = DisposeBag()
 
//创建一个BehaviorSubject
let subject = BehaviorSubject(value: "111")
 
//第1次订阅subject
subject.subscribe { event in
    print("第1次订阅:", event)
}.disposed(by: disposeBag)

变换操作符

变换操作指的是对原始的 Observable 序列进行一些转换

let disposeBag = DisposeBag()
 
Observable.of(1, 2, 3)
    .map { $0 * 10}
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)

https://juejin.cn/post/7020417608721629215#heading-6

上一篇 下一篇

猜你喜欢

热点阅读