rxjs

2017-09-05  本文已影响10人  pixels

rxjs使用观察者模式、迭代器模式以及函数式编程实现一种理想的、管理序列事件的方式
rxjs的基础概念

  1. Observable:是一个包含多个值的集合,这些值都是懒推送(lazy push)进集合中的
  2. Observer
  3. Subscription
  4. Operators
  5. Subject
  6. Schedulers

pull push system

先搞清楚两个角色:

  1. 生产者(producer): 数据产生的地方
  2. 消费者(consumer): 数据使用的地方

pull系统:consumer决定什么时候接受producer生产的数据,比如函数,就是一个pull system,它只生产数据,并不知道什么时候这些数据会被使用。
push系统:producer决定什么时候把生产的数据传递给消费者,如promise,promise决定什么时候把生产的值“push”给callback函数

Observable

Observable的核心概念:

  1. creating Observables:可以通过Rx.Observable.create创建,或者通过所谓的创建操作如:of、from、interval等创建
  2. Subscribing Observables:Observables的注册就像调用一个函数,这个函数提供一个回调函数,数据最终会在这个回调函数中使用
  3. Executing the Observable:Observable.create(function subscribe(observer) {...})中的一段代码,在Excution中,如果error或者complete执行了,那么后续的observer就不会执行
  4. Disposing Observables:处理Observables。Executing Observables可能是循环的,需要一个unsubscribe()去终止这个无限循环

概念很不好理解,下面是一个js写的简易版Observale,仅帮助理解,注释表明了1,2,3的含义


var observerOrigin = function(nextSelf) {
  this.nextSelf = nextSelf ? nextSelf : null
}
observerOrigin.closed = false
observerOrigin.prototype.next = function(val) {
  if (observerOrigin.closed) {
    return
  }
  this.nextSelf ? this.nextSelf.call(this, val) : console.log(val)
}
observerOrigin.prototype.error = function(error) {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.error(error)
}
observerOrigin.prototype.complete = function(val) {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.log('complete')
}

var observable = function(subscribeFun) {
  this.subscribeFun = subscribeFun
}
observable.prototype.subscribe = function(observer) {
  let observerInner = new observerOrigin(observer) // a observer
  this.subscribeFun.call(this, observerInner)
  return this
}
/* subscribeFunEx: Executing Observables
   Executing Observables执行的过程中,如果执行了observer的error或者complete,后续的其他操作就不会执行
*/
var subscribeFunEx = function(observer) {
  observer.next(1)
  observer.next(2)
  // observer.error('throw a error')
  // observer.complete()
  observer.next(3)
}
// Creating Observables
var observableEx = new observable(subscribeFunEx)
observableEx
  .subscribe()   // Subscribing to Observables
  .subscribe((val) => console.log('next: ', val))   // Subscribing to Observables

对于第四点,官方示例如下

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next('hi');
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

// Later:
unsubscribe(); // dispose the resources

Observer

Observer是一个对象,这个对象有三个回调函数(next,error,complete),任何一个回调函数都可能调用

Subscription

var subscription = observable.subscribe(x => console.log(x)),subscription有一个unsubscribe()方法释放所有的资源并且取消Observable的执行,也可以通过add()将多个subscription放在一起(个人感觉类似数组的unshift),这个时候调用一个subscription的unsubscribe()方法可能会将多个Subscriptionunsubscribe()

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
  // Unsubscribes BOTH subscription and childSubscription
  subscription.unsubscribe();
}, 1000);

Subscription还有一个remove(otherSubscription)方法

Subject

一个Subject就是一个Observable,和Observable的区别是,Subject可以多播多个observers,它就是一个注册器,订阅者将自己想要订阅的事件注册到注册中心。Subject的subscribe并不会立即执行传递过来的值,它只是将订阅的事件放到一个observers的list中,类似addListener的作用
一个Subject也是一个Observer,通过next(value)可以将值多播至注册在Subject中的订阅事件

var subject = new Rx.Subject();  // 一个Observables

subject.subscribe({  // subscribe类似于别的语言中的addListener
  next: (v) => console.log('observerA: ' + v)  // 订阅的事件
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v) // 订阅的事件
});

subject.next(1); // 将value值1多播至上面的订阅事件中
subject.next(2);// 将value值2多播至上面的订阅事件中

打印结果:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

subject也是一个observer,所以也可以observable.subscribe(subject)

Muticasted Observables

multicast返回的的Observable的subscribe方法和Subject的subscribe方法作用相同(即类似其他语言的addListener),connect方法调用的是observable的subscribe

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// These are, under the hood, `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// This is, under the hood, `source.subscribe(subject)`:
multicasted.connect();

BehaviorSubject

BehaviorSubject表示“随着时间变化的值”,例如人的生日是不变的时间流,使用Subject,那么人的年龄就是随着时间变化的事件流,用BehaviorSubject表示

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(3);

输出

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

如果将上述的BehaviorSubject换成Subject,输出将变为

observerA: 1
observerA: 2
observerA: 3
observerB: 3

ReplaySubject

ReplaySubject记录多个来自Observable excution的值,并将它们分配给新的subscribes

var subject = new Rx.ReplaySubject(3); // buffer 3 values for new subscribers

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);

输出:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

设置一个windowTime来决定,分配多少个并且最近windowTime时间内来自Observable excution的值

// Rx.ReplaySubject(10, 1000),observerB记录1开始至结束的值
// Rx.ReplaySubject(10, 500),observerB记录3开始至结束(500ms~1000ms之前的buffer)的值
// Rx.ReplaySubject(3, 1000),observerB记录3开始至结束(只subscribe3个buffer)的值
var subject = new Rx.ReplaySubject(10, 1000 /* windowTime */);

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
  subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
  });
}, 1000);

setTimeout(() => {
  subject.unsubscribe();
}, 5000);

AsyncSubject

AsyncSubject中,只有最后一个值会传递给observers

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();

输出

observerA: 5
observerB: 5

Operator

Operator是函数,这个函数会根据原来的Observable创建一个新的Observable,并且不会改变原来的Observable。.map(...), .filter(...), .merge(...)都是Operator

function multiplyByTen(input) {
  var output = Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
  return output;
}

var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));

输出

10
20
30
40

转化成js(简易版,很多漏洞,望指教)


var observerOrigin = function(nextSelf) {
  this.nextSelf = nextSelf ? nextSelf : null
}
observerOrigin.closed = false
observerOrigin.prototype.next = function(val) {
  if (observerOrigin.closed) {
    return
  }
  this.nextSelf ? this.nextSelf.call(this, val) : console.log(val)
}
observerOrigin.prototype.error = function(error) {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.error(error)
}
observerOrigin.prototype.complete = function() {
  if (observerOrigin.closed) {
    return
  }
  observerOrigin.closed = true
  console.log('complete')
}

var observable = function(subscribeFun) {
  this.subscribeFun = subscribeFun
}
observable.prototype.subscribe = function(observer) {
  let observerInner = new observerOrigin(observer) // a observer
  this.subscribeFun.call(this, observerInner)
  return this
}
/* subscribeFunEx: Executing Observables
   Executing Observables执行的过程中,如果执行了observer的error或者complete,后续的其他操作就不会执行
*/
var subscribeFunEx = function(observer) {
  observer.next(1)
  observer.next(2)
  observer.next(3)
  observer.next(4)
}

function multiplyByTen(input) {
  var output = new observable(function subscribe(observer) {
    input.subscribe(v => observer.next(10 * v))
  });
  return output;
}
// Creating Observables
var input = new observable(subscribeFunEx)
var output = multiplyByTen(input);
output.subscribe(v => console.log(v))

由上可以看出:output的subscribe会导致input的subscribe,这叫做“operator subscription chain”

Instance operators versus static operators(实例运算符与静态运算符)

在Instance operators中,this关键字是输入的Observable,通过input
observable创建一个observable。static operators是通过Observable对象从头开始创建一个Observable

Scheduler

一个scheduler可以定义在什么样的执行环境中,observable会把通知传递给observer

var observable = Rx.Observable.create(function (proxyObserver) {
  proxyObserver.next(1);
  proxyObserver.next(2);
  proxyObserver.next(3);
  proxyObserver.complete();
})
.observeOn(Rx.Scheduler.async);

var finalObserver = {
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
};
console.log('just before subscribe');
observable.subscribe(finalObserver);
console.log('just after subscribe');

proxyObserver是在observeOn(Rx.Scheduler.async)中创建的,scheduler在Observable.create和最终的Observer之间创建了一个proxyObserver,proxyObserver实质上通过setTimeout或者setInterval操作来实现一个延迟执行(delay)

Scheduler种类

Scheduler 目的
null 消息同步的递归的传递,
Rx.Scheduler.queue 在当前时间框架的队列中按时间表传递,用于迭代操作
Rx.Scheduler.asap 在微型任务队列中按时间表传递,例如NodeJs的nextTick()、Web Worker的MessageChannel、setTimeout()或者其他的,用于转换成异步操作
Rx.Scheduler.async Scheduler通过setInterval工作,用于基于事件的操作

使用 Schedulers

Static creation operators通常有一个Scheduler作为最后一个函数参数,如 from(array, scheduler)
Scheduler.subscribeOn决定subscribe()在什么环境中执行
Scheduler.observeOn决定在什么环境中传递通知
Instance operators有一个Scheduler作为函数参数

上一篇下一篇

猜你喜欢

热点阅读