11RxJS subject和及其3个子类

2021-01-10  本文已影响0人  learninginto
单播和多播

之前我们看到的所有Observable都是单播的,即源头有值发出时,不管这个Observable被几个Observer订阅,它一次只会给一个Observer推送多播:当源头有值发出时,这个值会同一时间发给所有的Observer

简单来说,单播与多播的区别类似于concat和merge的区别

按顺序依次执行,A先走完再开始B

const source$ = range(5)
source$.subscribe(val => console.log('A' + val))
source$.subscribe(val => console.log('B' + val))
//A0
//A1
//A2
//B0
//B1
//B2

这里的A和B同时被调用

const source$ = range(2);
const subject$ = new Subject();
subject$.subscribe(val => console.log('A:' + val))
subject$.subscribe(val => console.log('B:' + val))
source$.subscribe(subject$);
//A0
//B0
//A1
//B1
Subject

subject是一种特殊的Observable,而且是多播的。

既然Observable,就可以被subscribe,只不过每个observer都会存一份list,一旦有值发出,每个observer都会同时收到值

subject还是observer,可以执行next(),error(),complete()的方法

const subject = new Subject<number>();
subject.subscribe({
  next: (v) => console.log(`observerA:${v}`)
})
subject.subscribe({
  next: (v) => console.log(`observerB:${v}`)
})
subject.next(1);
subject.next(2);
//observerA:1
//observerB:1
//observerA:2
//observerB:2

有了subject之后,我们可以直接把subject传到subscribe中

const observable = from([1, 2])
const subject = new Subject<number>();
subject.subscribe({
  next: (v) => console.log(`observerA:${v}`)
})
subject.subscribe({
  next: (v) => console.log(`observerB:${v}`)
})
observable.subscribe(subject)
//A1
//B1
//A2
//B2

上面的例子中,通过subject将单播的Observable转成了多播的,这是其中一种方式,rxjs提供了一些多播类的操作符也可以将单播的Observable转成多播的

Subject三个子类

BehaviorSubject

BehaviorSubject可以储存最近发送的一个值,只要有新的Observer订阅,就立马推送当前的最新值

const subject = new Subject()
const observerA = {
  next: val => console.log('A next:' + val),
  error: err => console.log('A error:' + err),
  complete: () => console.log('A complete')
}
const observerB = {
  next: val => console.log('B next:' + val),
  error: err => console.log('B error:' + err),
  complete: () => console.log('B complete')
}
subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.subscribe(observerB)
//这里只能接收到2之后的推送
subject.next(3);
//A next:1
//A next:2
//A next:3
//B next:3

如果想要拿到2的推送,可以使用BehaviorSubject,且必须要指定一个初始值

const subject = new BehaviorSubject(0)
const observerA = {
  next: val => console.log('A next:' + val),
  error: err => console.log('A error:' + err),
  complete: () => console.log('A complete')
}
const observerB = {
  next: val => console.log('B next:' + val),
  error: err => console.log('B error:' + err),
  complete: () => console.log('B complete')
}
subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.subscribe(observerB)
subject.next(3);
//A next:1
//A next:2
//B next:2
//A next:3
//B next:3
ReplaySubject

ReplaySubject可以指定推送最近的多少个值给新的Observer,而BehaviorSubject只会推最近的一个值

const subject = new ReplaySubject(2)
const observerA = {
  next: val => console.log('A next:' + val),
  error: err => console.log('A error:' + err),
  complete: () => console.log('A complete')
}
const observerB = {
  next: val => console.log('B next:' + val),
  error: err => console.log('B error:' + err),
  complete: () => console.log('B complete')
}
subject.subscribe(observerA);
subject.next(1);
subject.next(2);
subject.next(3);
subject.subscribe(observerB)
subject.next(4);
subject.next(5);
// 1 2 3 2 3 4 4 5 5

还可以指定第二个参数,设置缓存的有效期

缓存2个值,并且只在3秒内有效,超过3秒新的Observer将不会订阅任何值

const subject = new ReplaySubject(2, 3000)
subject.subscribe({
  next: (v) => console.log(`observerA:${v}`)
})
range(3).subscribe(val => subject.next(val))
setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log(`ovserverB:${v}`)
  })
}, 2000)//这里的时间不能超过3000ms
//observerA:0
//observerA:1
//observerA:2
//observerA:1
//observerA:2
AsyncSubject

只在Subject结束时,推送最后一个值。所以,在没有推送complete时,不会有任何响应

const subject = new AsyncSubject()

subject.subscribe({
  next: (v) => console.log(`observerA:${v}`)
})
subject.next(1);
subject.next(2);
subject.subscribe({
  next: (v) => console.log(`observerB:${v}`)
})
subject.next(3);
subject.complete();
//observerA:3
//observerB:3
上一篇 下一篇

猜你喜欢

热点阅读