Rx.js 入门

2019-05-26  本文已影响0人  风之化身呀

基本概念

1、RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observable 与 Observer。
2、Observable 作为被观察者,是一个值或事件的流集合;Observer 则作为观察者,根据 Observable 进行处理。
3、Observable 与 Observer 之间的订阅发布关系(观察者模式) 如下:
订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。即 observeable.subscribe(observer) 订阅 observeable
发布:Observeable 内部通过回调 next 方法向 Observer 发布事件。即
observer.next() 通知 Observer
4、observer 是一个形如这样的对象,不必实现所有方法

{
  next(x) { console.log(x); },
  error(err) { console.error(err); },
  complete() { console.log('done')}
}

1、Observable

观察者模式 和 迭代器模式是 Observable 的基础,创建 Observable 的方式

// 使用 Observable 创建
$ = new Observable((observer)=>{
  // 决定何时调用 observer.next 方法
})
$.subscribe({
     next:()=>{},
     error:()=>{},
     complete:()=>{}
})
// 或者
$.subscribe(next,err,complete)

// 使用 Subject 创建,具有更多的控制权
import { Subject } from 'rxjs';
const subject = new Subject();
subject.next('missed message from Subject');
subject.subscribe(v => console.log(v));
subject.next('hello from subject!');

// 包装 ajax
let stream = Rx.Observable.create((observer) => {
   let request = new XMLHttpRequest();

   request.open( ‘GET’, ‘url’ );
   request.onload =() =>{
      if(request.status === 200) {
         observer.next( request.response );
         observer.complete();
     } else {
          observer.error('error happened');
     }
   }

   request.onerror = () => {  
       observer.error('error happened')
   }
   request.send();
})

stream.subscribe(
   (data) => console.log( data )  
)

每次调用 subscribe 方法时,都会将其参数转换成 observer 需要的对象格式传个 Observable

import { of } from "rxjs";

const source$ = of('Semlinker', 'Lolo');

source$.subscribe({
    next: function(value) {
      console.log(value);
    },
    complete: function() {
      console.log('complete!');
    },
    error: function(error) {
      console.log(error);
    }
});
// output 
Semlinker
Lolo
complete!
import { from } from "rxjs";

const source$ = from([1, 2, 3]); // 也支持字符串,比如"Angular"

source$.subscribe({
  next: function(value) {
    console.log(value);
  },
  complete: function() {
    console.log("complete!");
  },
  error: function(error) {
    console.log(error);
  }
});
// output
1
2
3
complete!

2、数据源为 Promise 对象

import { from } from "rxjs";

const promiseSource$ = from(new Promise(resolve => resolve("Hello World!")));

promiseSource$.subscribe({
  next: function(value) {
    console.log(value);
  },
  complete: function() {
    console.log("complete!");
  },
  error: function(error) {
    console.log(error);
  }
});
import { fromEvent } from "rxjs";
import { map } from "rxjs/operators";

const source$ = fromEvent(document, "click");
const example$ = source$.pipe(map(event => `Event time: ${event.timeStamp}`));
const subscribe = example$.subscribe(val => console.log(val));

import { timer } from "rxjs";

const source$ = timer(1000, 5000);
const subscribe = source$.subscribe(val => console.log(val));
// output
0 # 1s后
1 # 5s后
2 # 5s后
...

import { timer } from "rxjs";

const source$ = timer(1000);
source$.subscribe(
  val => console.log(val),
  () => console.error("error!"),
  () => console.log("complete!")
);
// output 
0
complete
import { interval } from "rxjs";

const source$ = interval(1000);
source$.subscribe(val => console.log(val));
// output
0
1
...

2、Operators

Operator 是一个函数,它接收一个 Observable 对象,然后返回一个新的 Observable 对象。当我们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。多个 Operator 函数可以用 pipe 进行连接:

import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
const apiData = ajax('/api/data').pipe(
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);
 
apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});

2.1、常见操作符

Observable.of([1, 2, 3]).subscribe(x => console.log(x));  // x=[1,2,3]
Observable.from([1, 2, 3]).subscribe(x => console.log(x)); // x=1,x=2,x=3
const example = fromEvent(input, 'keyup').pipe(map(i => i.currentTarget.value));

// 在两次键盘敲击之间等待0.5秒方才发出当前值,
// 并丢弃这0.5秒内的所有其他值
const debouncedInput = example.pipe(debounceTime(500));

distinctUntilChanged 只有当当前值与之前最后一个值不同时才将其发出

const myArrayWithDuplicatesInARow = from([1, 1, 2, 2, 3, 1, 2, 3]);

const distinctSub = myArrayWithDuplicatesInARow
  .pipe(distinctUntilChanged())
  // 输出: 1,2,3,1,2,3
  .subscribe(val => console.log('DISTINCT SUB:', val));

filter 过滤掉不符合的

// 发出 (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
// 过滤掉奇数
const example = source.pipe(filter(num => num % 2 === 0));

take 在完成前发出前N个值

// 发出 1,2,3,4,5
const source = of(1, 2, 3, 4, 5);
// 取第一个发出的值然后完成
const example = source.pipe(take(1));

takeUntil 发出值,直到提供的 observable 发出值,它便完成

// 每1秒发出值
const source = interval(1000);
// 5秒后发出值
const timer$ = timer(5000);
// 当5秒后 timer 发出值时, source 则完成
const example = source.pipe(takeUntil(timer$));
// 输出: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
// 发出 (1,2,3,4,5)
const source = from([1, 2, 3, 4, 5]);
// 每个数字加10
const example = source.pipe(map(val => val + 10));
// 输出: 11,12,13,14,15
const subscribe = example.subscribe(val => console.log(val));

mergeMap和 flatMap 功能一致,映射成 observable 并发出值
switchMap 映射成 observable,完成前一个内部 observable,发出值
scan 随着时间的推移进行归并

import { of } from 'rxjs';
import { scan } from 'rxjs/operators';

const source = of(1, 2, 3);
// 基础的 scan 示例,从0开始,随着时间的推移计算总数
const example = source.pipe(scan((acc, curr) => acc + curr, 0));
// 输出累加值
// 输出: 1,3,6
const subscribe = example.subscribe(val => console.log(val));
const source = of(1, 2, 3, 4, 5);
// 使用 tap 透明地打印 source 中的值
const example = source.pipe(
  tap(val => console.log(`BEFORE MAP: ${val}`)),
  map(val => val + 10),
  tap(val => console.log(`AFTER MAP: ${val}`))
);

2.2、业务中常用的操作符

this.http.get(this.apiUrl)
      .subscribe(users => {
        let username = users[6].username;
        this.http.get(`${this.apiUrl}?username=${username}`)
          .subscribe(
          user => {
            this.username = username;
            this.user = user;
          });
      });
// 改写为
this.http.get(this.apiUrl)
      .pipe(mergeMap(users => {
        let username = users[6].username;
        return this.http.get(`${this.apiUrl}?username=${username}`)
      })).subscribe(user => {
        this.user = user
      });
let post1$ = this.http.get(`${this.apiUrl}/1`);
    let post2$ = this.http.get(`${this.apiUrl}/2`);

    forkJoin([post1$, post2$])
      .subscribe(results => {
        this.post1 = results[0];
        this.post2 = results[1];
      });

对比 Promise

Promise 的缺点:
1、不能生产多值,只能消费一次
2、不能重试

3、Subscription

Subscription是一个代表可以终止资源的对象,表示一个Observable的执行过程。Subscription有一个重要的方法:unsubscribe。这个方法不需要传入参数,调用后便会终止相应的资源。

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
  // 终止所有嵌套的Subscription
  subscription.unsubscribe();
}, 1000);
// 输出
second: 0
first: 0
second: 1
first: 1
second: 2

如果不用 subscription.unsubscribe,则会一直产生数据,容易造成内存泄露

常用操作符

import { timer } from 'rxjs';
const source = timer(1000); // 1秒后发出0,然后结束,因为没有提供第二个参数
const subscribe = source.subscribe(val => console.log(val)); // 输出: 0

/*
  timer 接收第二个参数,它决定了发出序列值的频率,在本例中我们在1秒发出第一个值,
  然后每2秒发出序列值
*/
const source = timer(1000, 2000);
const subscribe = source.subscribe(val => console.log(val)); // 输出: 0,1,2,3,4,5......
import { interval } from 'rxjs';
const source = interval(1000); // 1秒后发出0,然后每1秒发出数字序列中的其它值
const subscribe = source.subscribe(val => console.log(val)); // 数字: 0,1,2,3,4,5....
var source = Rx.Observable.interval(1000).take(6);
var example = source.takeLast(2);

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});

source : ----0----1----2----3----4----5|
                takeLast(2)
example: ------------------------------(45)|

注意:1、takeLast 必须等source完结;2、takeLast 流中的值同步发出,如上例同时发出45,而不是先4,再5。即 console.log 连续执行两次,中间不隔1s

import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

// 每1秒发出值
const source = interval(1000);
// 5秒后发出值
const timer$ = timer(5000);
// 当5秒后 timer 发出值时, source 则完成
const example = source.pipe(takeUntil(timer$));
// 输出: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
const source = of(1, 2, 3, 4, 5);
// 使用 tap 透明地打印 source 中的值,tap 不改变流中的值
const example = source.pipe(
  tap(val => console.log(`BEFORE MAP: ${val}`)),
  map(val => val + 10),
  tap(val => console.log(`AFTER MAP: ${val}`))
);

const button = document.querySelector('button');

const click$ = fromEvent(button, 'click');
const interval$ = interval(1000);

const observable$ = click$.pipe(mergeMap(event => {
  return interval$;
}));

observable$.subscribe(num => console.log(num));
switchMap(project: function: Observable, resultSelector: function(outerValue, innerValue, outerIndex, innerIndex): any): Observable

import { timer } from 'rxjs/observable/timer';
import { interval } from 'rxjs/observable/interval';
import { switchMap } from 'rxjs/operators';
// 立即发出值, 然后每5秒发出值
const source = timer(0, 5000);
// 当 source 发出值时切换到新的内部 observable,调用投射函数并发出值
const example = source.pipe(
  switchMap(
    _ => interval(2000),
    (outerValue, innerValue, outerIndex, innerIndex) => ({
      outerValue,
      innerValue,
      outerIndex,
      innerIndex
    })
  )
);
/*
    输出:
    {outerValue: 0, innerValue: 0, outerIndex: 0, innerIndex: 0}
    {outerValue: 0, innerValue: 1, outerIndex: 0, innerIndex: 1}
    {outerValue: 1, innerValue: 0, outerIndex: 1, innerIndex: 0}
    {outerValue: 1, innerValue: 1, outerIndex: 1, innerIndex: 1}
*/
const subscribe = example.subscribe(val => console.log(val));
// 1、参数 observable 频率比 source 高
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs/observable/interval';
// 每5秒发出值
const source = interval(5000);
// 每1秒发出值
const secondSource = interval(1000);
const example = source.pipe(
  withLatestFrom(secondSource),
  map(([first, second]) => {
    return `First Source (5s): ${first} Second Source (1s): ${second}`;
  })
);
/*
  输出:
  "First Source (5s): 0 Second Source (1s): 4"
  "First Source (5s): 1 Second Source (1s): 9"
  "First Source (5s): 2 Second Source (1s): 14"
  ...
*/
const subscribe = example.subscribe(val => console.log(val));

// 2、参数 observable 频率比 source 底
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs/observable/interval';
// 每5秒发出值
const source = interval(5000);
// 每1秒发出值
const secondSource = interval(1000);
// withLatestFrom 的 observable 比源 observable 慢
const example = secondSource.pipe(
  // 两个 observable 在发出值前要确保至少都有1个值 (需要等待5秒)
  withLatestFrom(source),
  map(([first, second]) => {
    return `Source (1s): ${first} Latest From (5s): ${second}`;
  })
);
/*
  "Source (1s): 4 Latest From (5s): 0"
  "Source (1s): 5 Latest From (5s): 0"
  "Source (1s): 6 Latest From (5s): 0"
  ...
*/
const subscribe = example.subscribe(val => console.log(val));
import { delay, catchError } from 'rxjs/operators';
import { forkJoin } from 'rxjs/observable/forkJoin';
import { of } from 'rxjs/observable/of';
import { _throw } from 'rxjs/observable/throw';
/*
  当所有 observables 完成时,将每个 observable 
  的最新值作为数组发出
*/
const example = forkJoin(
  // 立即发出 'Hello'
  of('Hello'),
  // 1秒后发出 'World'
  of('World').pipe(delay(1000)),
  // 抛出错误
  _throw('This will error').pipe(catchError(error => of(error)))
);
// 输出: ["Hello", "World", "This will error"]
const subscribe = example.subscribe(val => console.log(val));
of(1, 2, 3)
  .pipe(
    pairwise()).subscribe(v => console.log(v))
[1,2]
[2,3]

参考

上一篇下一篇

猜你喜欢

热点阅读