rxjs
rxjs使用观察者模式、迭代器模式以及函数式编程实现一种理想的、管理序列事件的方式
rxjs的基础概念
- Observable:是一个包含多个值的集合,这些值都是懒推送(lazy push)进集合中的
- Observer
- Subscription
- Operators
- Subject
- Schedulers
pull push system
先搞清楚两个角色:
- 生产者(producer): 数据产生的地方
- 消费者(consumer): 数据使用的地方
pull系统:consumer决定什么时候接受producer生产的数据,比如函数,就是一个pull system,它只生产数据,并不知道什么时候这些数据会被使用。
push系统:producer决定什么时候把生产的数据传递给消费者,如promise,promise决定什么时候把生产的值“push”给callback函数
Observable
Observable的核心概念:
- creating Observables:可以通过Rx.Observable.create创建,或者通过所谓的创建操作如:of、from、interval等创建
- Subscribing Observables:Observables的注册就像调用一个函数,这个函数提供一个回调函数,数据最终会在这个回调函数中使用
-
Executing the Observable:
Observable.create(function subscribe(observer) {...})
中的一段代码,在Excution中,如果error或者complete执行了,那么后续的observer就不会执行 - Disposing Observables:处理Observables。Executing Observables可能是循环的,需要一个unsubscribe()去终止这个无限循环
概念很不好理解,下面是一个js写的简易版Observale,仅帮助理解,注释表明了1,2,3的含义
var observerOrigin = function(nextSelf) {
this.nextSelf = nextSelf ? nextSelf : null
}
observerOrigin.closed = false
observerOrigin.prototype.next = function(val) {
if (observerOrigin.closed) {
return
}
this.nextSelf ? this.nextSelf.call(this, val) : console.log(val)
}
observerOrigin.prototype.error = function(error) {
if (observerOrigin.closed) {
return
}
observerOrigin.closed = true
console.error(error)
}
observerOrigin.prototype.complete = function(val) {
if (observerOrigin.closed) {
return
}
observerOrigin.closed = true
console.log('complete')
}
var observable = function(subscribeFun) {
this.subscribeFun = subscribeFun
}
observable.prototype.subscribe = function(observer) {
let observerInner = new observerOrigin(observer) // a observer
this.subscribeFun.call(this, observerInner)
return this
}
/* subscribeFunEx: Executing Observables
Executing Observables执行的过程中,如果执行了observer的error或者complete,后续的其他操作就不会执行
*/
var subscribeFunEx = function(observer) {
observer.next(1)
observer.next(2)
// observer.error('throw a error')
// observer.complete()
observer.next(3)
}
// Creating Observables
var observableEx = new observable(subscribeFunEx)
observableEx
.subscribe() // Subscribing to Observables
.subscribe((val) => console.log('next: ', val)) // Subscribing to Observables
对于第四点,官方示例如下
function subscribe(observer) {
var intervalID = setInterval(() => {
observer.next('hi');
}, 1000);
return function unsubscribe() {
clearInterval(intervalID);
};
}
var unsubscribe = subscribe({next: (x) => console.log(x)});
// Later:
unsubscribe(); // dispose the resources
Observer
Observer是一个对象,这个对象有三个回调函数(next,error,complete),任何一个回调函数都可能调用
Subscription
var subscription = observable.subscribe(x => console.log(x))
,subscription有一个unsubscribe()
方法释放所有的资源并且取消Observable的执行,也可以通过add()
将多个subscription放在一起(个人感觉类似数组的unshift),这个时候调用一个subscription的unsubscribe()
方法可能会将多个Subscriptionunsubscribe()
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);
var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));
subscription.add(childSubscription);
setTimeout(() => {
// Unsubscribes BOTH subscription and childSubscription
subscription.unsubscribe();
}, 1000);
Subscription还有一个remove(otherSubscription)
方法
Subject
一个Subject就是一个Observable,和Observable的区别是,Subject可以多播多个observers,它就是一个注册器,订阅者将自己想要订阅的事件注册到注册中心。Subject的subscribe并不会立即执行传递过来的值,它只是将订阅的事件放到一个observers的list中,类似addListener的作用
一个Subject也是一个Observer,通过next(value)
可以将值多播至注册在Subject中的订阅事件
var subject = new Rx.Subject(); // 一个Observables
subject.subscribe({ // subscribe类似于别的语言中的addListener
next: (v) => console.log('observerA: ' + v) // 订阅的事件
});
subject.subscribe({
next: (v) => console.log('observerB: ' + v) // 订阅的事件
});
subject.next(1); // 将value值1多播至上面的订阅事件中
subject.next(2);// 将value值2多播至上面的订阅事件中
打印结果:
observerA: 1
observerB: 1
observerA: 2
observerB: 2
subject也是一个observer,所以也可以observable.subscribe(subject)
Muticasted Observables
multicast返回的的Observable的subscribe方法和Subject的subscribe方法作用相同(即类似其他语言的addListener),connect
方法调用的是observable的subscribe
var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);
// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();
BehaviorSubject
BehaviorSubject表示“随着时间变化的值”,例如人的生日是不变的时间流,使用Subject,那么人的年龄就是随着时间变化的事件流,用BehaviorSubject表示
var subject = new Rx.BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
输出
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3
如果将上述的BehaviorSubject换成Subject,输出将变为
observerA: 1
observerA: 2
observerA: 3
observerB: 3
ReplaySubject
ReplaySubject记录多个来自Observable excution的值,并将它们分配给新的subscribes
var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
输出:
observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5
设置一个windowTime来决定,分配多少个并且最近windowTime时间内来自Observable excution的值
// Rx.ReplaySubject(10, 1000),observerB记录1开始至结束的值
// Rx.ReplaySubject(10, 500),observerB记录3开始至结束(500ms~1000ms之前的buffer)的值
// Rx.ReplaySubject(3, 1000),observerB记录3开始至结束(只subscribe3个buffer)的值
var subject = new Rx.ReplaySubject(10, 1000 /* windowTime */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
var i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);
setTimeout(() => {
subject.unsubscribe();
}, 5000);
AsyncSubject
AsyncSubject中,只有最后一个值会传递给observers
var subject = new Rx.AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
输出
observerA: 5
observerB: 5
Operator
Operator是函数,这个函数会根据原来的Observable创建一个新的Observable,并且不会改变原来的Observable。.map(...), .filter(...), .merge(...)
都是Operator
function multiplyByTen(input) {
var output = Rx.Observable.create(function subscribe(observer) {
input.subscribe({
next: (v) => observer.next(10 * v),
error: (err) => observer.error(err),
complete: () => observer.complete()
});
});
return output;
}
var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));
输出
10
20
30
40
转化成js(简易版,很多漏洞,望指教)
var observerOrigin = function(nextSelf) {
this.nextSelf = nextSelf ? nextSelf : null
}
observerOrigin.closed = false
observerOrigin.prototype.next = function(val) {
if (observerOrigin.closed) {
return
}
this.nextSelf ? this.nextSelf.call(this, val) : console.log(val)
}
observerOrigin.prototype.error = function(error) {
if (observerOrigin.closed) {
return
}
observerOrigin.closed = true
console.error(error)
}
observerOrigin.prototype.complete = function() {
if (observerOrigin.closed) {
return
}
observerOrigin.closed = true
console.log('complete')
}
var observable = function(subscribeFun) {
this.subscribeFun = subscribeFun
}
observable.prototype.subscribe = function(observer) {
let observerInner = new observerOrigin(observer) // a observer
this.subscribeFun.call(this, observerInner)
return this
}
/* subscribeFunEx: Executing Observables
Executing Observables执行的过程中,如果执行了observer的error或者complete,后续的其他操作就不会执行
*/
var subscribeFunEx = function(observer) {
observer.next(1)
observer.next(2)
observer.next(3)
observer.next(4)
}
function multiplyByTen(input) {
var output = new observable(function subscribe(observer) {
input.subscribe(v => observer.next(10 * v))
});
return output;
}
// Creating Observables
var input = new observable(subscribeFunEx)
var output = multiplyByTen(input);
output.subscribe(v => console.log(v))
由上可以看出:output的subscribe会导致input的subscribe,这叫做“operator subscription chain”
Instance operators versus static operators(实例运算符与静态运算符)
在Instance operators中,this关键字是输入的Observable,通过input
observable创建一个observable。static operators是通过Observable对象从头开始创建一个Observable
Scheduler
一个scheduler可以定义在什么样的执行环境中,observable会把通知传递给observer
var observable = Rx.Observable.create(function (proxyObserver) {
proxyObserver.next(1);
proxyObserver.next(2);
proxyObserver.next(3);
proxyObserver.complete();
})
.observeOn(Rx.Scheduler.async);
var finalObserver = {
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');
proxyObserver是在observeOn(Rx.Scheduler.async)中创建的,scheduler在Observable.create和最终的Observer之间创建了一个proxyObserver,proxyObserver实质上通过setTimeout或者setInterval操作来实现一个延迟执行(delay)
Scheduler种类
Scheduler | 目的 |
---|---|
null | 消息同步的递归的传递, |
Rx.Scheduler.queue | 在当前时间框架的队列中按时间表传递,用于迭代操作 |
Rx.Scheduler.asap | 在微型任务队列中按时间表传递,例如NodeJs的nextTick()、Web Worker的MessageChannel、setTimeout()或者其他的,用于转换成异步操作 |
Rx.Scheduler.async | Scheduler通过setInterval工作,用于基于事件的操作 |
使用 Schedulers
Static creation operators通常有一个Scheduler作为最后一个函数参数,如 from(array, scheduler)
Scheduler.subscribeOn决定subscribe()在什么环境中执行
Scheduler.observeOn决定在什么环境中传递通知
Instance operators有一个Scheduler作为函数参数