Angular

RxJS

2021-06-07  本文已影响0人  浅忆_0810

RxJS官网

1 概述

1.1 什么是 RxJS ?

RxJS 是一个用于处理异步编程的 JavaScript 库,目标是使编写异步和基于回调的代码更容易

1.2 为什么要学习 RxJS ?

就像 Angular 深度集成 TypeScript 一样,Angular也深度集成了 RxJS

服务、表单、事件、全局状态管理、异步请求 ...

1.3 快速入门

  1. 可观察对象 ( Observable ) :类比 Promise 对象,内部可以用于执行异步代码,通过调用内部提供的方法将异步代码执行的结果传递到可观察对象外部

  2. 观察者 ( Observer ):类比 then 方法中的回调函数,用于接收可观察对象中传递出来数据

  3. 订阅 ( subscribe ):类比 then 方法,通过订阅将可观察对象和观察者连接起来,当可观察对象发出数据时,订阅者可以接收到数据。

    import { Observable } from "rxjs"
    
    const observable = new Observable(function (observer) {
      setTimeout(function () {
        observer.next({
          name: "张三"
        })
      }, 2000)
    })
    
    const observer = {
      next: function (value) {
        console.log(value)
      }
    }
    
    observable.subscribe(observer)
    

2. 可观察对象

2.1 Observable

  1. Observable 对象内部可以多次调用 next 方法向外发送数据

    const observable = new Observable(function (observer) {
      let index = 0
      setInterval(function () {
        observer.next(index++)
      }, 1000)
    })
    
    const observer = {
      next: function (value) {
        console.log(value)
      }
    }
    
    observable.subscribe(observer)
    
  2. 当所有数据发送完成以后,可以调用 complete 方法终止数据发送

    const observable = new Observable(function (observer) {
      let index = 0
      let timer = setInterval(function () {
        observer.next(index++)
        if (index === 3) {
          observer.complete()
          clearInterval(timer)
        }
      }, 1000)
    })
    
    const observer = {
      next: function (value) {
        console.log(value)
      },
      complete: function () {
        console.log("数据发送完成")
      }
    }
    
    observable.subscribe(observer)
    
  3. Observable 内部逻辑发生错误时,可以调用 error 方法将失败信息发送给订阅者,Observable 终止

    import { Observable } from "rxjs"
    
    const observable = new Observable(function (observer) {
      let index = 0
      let timer = setInterval(function () {
        observer.next(index++)
        if (index === 3) {
          observer.error("发生错误")
          clearInterval(timer)
        }
      }, 1000)
    })
    
    const observer = {
      next: function (value) {
        console.log(value)
      },
      error: function (error) {
        console.log(error)
      }
    }
    
    observable.subscribe(observer)
    
  4. 可观察对象是惰性的,只有被订阅后才会执行

    const observable = new Observable(function () {
      console.log("Hello RxJS")
    })
    // observable.subscribe()
    
  5. 可观察对象可以有 n 多订阅者,每次被订阅时都会得到执行

    const observable = new Observable(function () {
      console.log("Hello RxJS")
    })
    
    observable.subscribe()
    observable.subscribe()
    observable.subscribe()
    observable.subscribe()
    observable.subscribe()
    
  6. 取消订阅

    import { interval } from "rxjs"
    
    const obs = interval(1000)
    const subscription = obs.subscribe(console.log)
    
    setTimeout(function () {
      subscription.unsubscribe()
    }, 2000)
    

2.2 Subject

  1. 用于创建空的可观察对象,在订阅后不会立即执行,next 方法可以在可观察对象外部调用

    import { Subject } from "rxjs"
    
    const demoSubject = new Subject()
    
    demoSubject.subscribe({next: function (value) {console.log(value)}})
    demoSubject.subscribe({next: function (value) {console.log(value)}})
    
    setTimeout(function () {
      demoSubject.next("hahaha")
    }, 3000)
    

2.3 BehaviorSubject

拥有 Subject 全部功能,但是在创建 Obervable 对象时可以传入默认值,观察者订阅后可以直接拿到默认值

import { BehaviorSubject } from "rxjs"

const demoBehavior = new BehaviorSubject("默认值")
demoBehavior.subscribe({next: function (value) {console.log(value)}})
demoBehavior.next("Hello")

2.4 ReplaySubject

功能类似 Subject,但有新订阅者时两者处理方式不同,Subject 不会广播历史结果,而 ReplaySubject 会广播所有历史结果

import { ReplaySubject } from "rxjs"

const rSubject = new ReplaySubject()

rSubject.subscribe(value => {
  console.log(value)
})

rSubject.next("Hello 1")
rSubject.next("Hello 2")

setTimeout(function () {
  rSubject.subscribe({next: function (value) {console.log(value)}})
}, 3000)

3. 辅助方法

3.1 range

range(start, length),调用方法后返回 observable 对象,被订阅后会发出指定范围的数值

import { range } from "rxjs"

range(0, 5).subscribe(n => console.log(n))

// 0
// 1
// 2
// 3
// 4

方法内部并不是一次发出 length 个数值,而是发送了 length 次,每次发送一个数值,就是说内部调用了 lengthnext 方法

3.2 of

将参数列表作为数据流返回

of("a", "b", [], {}, true, 20).subscribe(v => console.log(v))

3.3 from

ArrayPromiseIterator 转换为 observable 对象

from(["a", "b", "c"]).subscribe(v => console.log(v))
// a
// b
// c
import { from } from "rxjs"

function p() {
  return new Promise(function (resolve) {
    resolve([100, 200])
  })
}

from(p()).subscribe(v => console.log(v))
// [100, 200]

3.4 intervaltimer

Interval:每隔一段时间发出一个数值,数值递增

import { interval } from "rxjs"

interval(1000).subscribe(n => console.log(n))

timer:间隔时间过去以后发出数值,行为终止,或间隔时间发出数值后,继续按第二个参数的时间间隔继续发出值

import { timer } from "rxjs"

timer(2000).subscribe(n => console.log(n))
timer(0, 1000).subscribe(n => console.log(n))

3.5 concat

合并数据流,先让第一个数据流发出值,结束后再让第二个数据流发出值,进行整体合并

import { concat, range } from "rxjs"

concat(range(1, 5), range(6, 5)).subscribe(console.log)

3.6 merge

合并数据流,多个参数一起发出数据流,按照时间线进行交叉合并

import { merge, fromEvent, interval } from "rxjs"

const clicks = fromEvent(document, "click")
const timer = interval(1000)

merge(clicks, timer).subscribe(console.log)

3.7 combineLatest

将两个 Obserable 中最新发出的数据流进行组合成新的数据流,以数组的形式发出。和当前最新的进行组合

import { combineLatest, timer } from "rxjs"

const firstTimer = timer(0, 1000) // emit 0, 1, 2... after every second, starting from now
const secondTimer = timer(500, 1000) // emit 0, 1, 2... after every second, starting 0,5s from now
combineLatest(firstTimer, secondTimer).subscribe(console.log)

// [0, 0] after 0.5s
// [1, 0] after 1s
// [1, 1] after 1.5s
// [2, 1] after 2s

3.8 zip

将多个 Observable 中的数据流进行组合。和将来最新的进行组合

import { zip, of } from "rxjs"
import { map } from "rxjs/operators"

let age = of(27, 25, 29)
let name = of("Foo", "Bar", "Beer")
let isDev = of(true, true, false)

zip(name, age, isDev)
  .pipe(map(([name, age, isDev]) => ({ name, age, isDev })))
  .subscribe(console.log)

// { name: 'Foo', age: 27, isDev: true }
// { name: 'Bar', age: 25, isDev: true }
// { name: 'Beer', age: 29, isDev: false }

3.9 forkJoin

forkJoinRx 版本的 Promise.all(),即表示等到所有的 Observable 都完成后,才一次性返回值

import axios from "axios"
import { forkJoin, from } from "rxjs"

axios.interceptors.response.use(response => response.data)

forkJoin({
  goods: from(axios.get("http://localhost:3005/goods")),
  category: from(axios.get("http://localhost:3005/category"))
}).subscribe(console.log)

3.10 throwError

返回可观察对象并向订阅者抛出错误

import { throwError } from "rxjs"

throwError("发生了未知错误").subscribe({ error: console.log })

3.11 retry

如果 Observable 对象抛出错误,则该辅助方法会重新订阅 Observable 以获取数据流,参数为重新订阅次数

import { interval, of, throwError } from "rxjs"
import { mergeMap, retry } from "rxjs/operators"

interval(1000)
  .pipe(
    mergeMap(val => {
      if (val > 2) {
        return throwError("Error!")
      }
      return of(val)
    }),
    retry(2)
  )
  .subscribe({
    next: console.log,
    error: console.log
  })

3.12 race

接收并同时执行多个可观察对象,只将最快发出的数据流传递给订阅者

import { race, timer } from "rxjs"
import { mapTo } from "rxjs/operators"

const obs1 = timer(1000).pipe(mapTo("fast one"))
const obs2 = timer(3000).pipe(mapTo("medium one"))
const obs3 = timer(5000).pipe(mapTo("slow one"))

race(obs3, obs1, obs2).subscribe(console.log)

3.13 fromEvent

将事件转换为 Observable

import { fromEvent } from "rxjs"

const btn = document.getElementById("btn")
// 可以将 Observer 简写成一个函数,表示 next
fromEvent(btn, "click").subscribe(e => console.log(e))

4. 操作符

  1. 数据流:从可观察对象内部输出的数据就是数据流,可观察对象内部可以向外部源源不断的输出数据
  2. 操作符:用于操作数据流,可以对象数据流进行转换,过滤等操作

4.1 mapmapTo

map:对数据流进行转换,基于原有值进行转换

import { interval } from "rxjs"
import { map } from "rxjs/operators"

interval(1000)
  .pipe(map(n => n * 2))
  .subscribe(n => console.log(n))

mapTo:对数据流进行转换,不关心原有值,可以直接传入要转换后的值

import { interval } from "rxjs"
import { mapTo } from "rxjs/operators"

interval(1000)
  .pipe(mapTo({ msg: "接收到了数据流" }))
  .subscribe(msg => console.log(msg))

4.2 filter

对数据流进行过滤

import { range } from "rxjs"
import { filter } from "rxjs/operators"

range(1, 10)
  .pipe(filter(n => n % 2 === 0))
  .subscribe(even => console.log(even))

4.3 pluck

获取数据流对象中的属性值

import { interval } from "rxjs"
import { pluck, mapTo } from "rxjs/operators"

interval(1000)
  .pipe(
    mapTo({ name: "张三", a: { b: "c" } }), 
    pluck("a", "b")
    )
  .subscribe(n => console.log(n))

4.4 first

获取数据流中的第一个值或者查找数据流中第一个符合条件的值,类似数组中的 find 方法。获取到值以后终止行为

import { interval } from "rxjs"
import { first } from "rxjs/operators"

interval(1000)
  .pipe(first())
  .subscribe(n => console.log(n))

interval(1000)
  .pipe(first(n => n === 3))
  .subscribe(n => console.log(n))

4.5 startWith

创建一个新的 observable 对象并将参数值发送出去,然后再发送源 observable 对象发出的值

在异步编程中提供默认值的时候非常有用

import { interval } from "rxjs"
import { map, startWith } from "rxjs/operators"

interval(1000)
  .pipe(
    map(n => n + 100),
    startWith(505)
  )
  .subscribe(n => console.log(n))
// 505
// 100
// 101
// 102
// ...

4.6 every

查看数据流中的每个值是否都符合条件,返回布尔值。类似数组中的 every 方法

import { range } from "rxjs"
import { every, map } from "rxjs/operators"

range(1, 9)
  .pipe(
    map(n => n * 2),
    every(n => n % 2 === 0)
  )
  .subscribe(b => console.log(b))

4.7 delaydelayWhen

delay:对上一环节的操作整体进行延迟,只执行一次

import { from } from "rxjs"
import { delay, map, tap } from "rxjs/operators"

from([1, 2, 3])
  .pipe(
    delay(1000),
    tap(n => console.log("已经延迟 1s", n)),
    map(n => n * 2),
    delay(1000),
    tap(() => console.log("又延迟了 1s"))
  )
  .subscribe(console.log)

// tap 操作符不会对数据流造成影响, 它被用来执行简单的副作用, 比如输出, 但是复杂的副作用不要在这执行, 比如 Ajax

delayWhen:对上一环节的操作进行延迟,上一环节发出多少数据流,传入的回调函数就会执行多次

import { range, timer } from "rxjs"
import { delayWhen } from "rxjs/operators"

range(1, 10)
  .pipe(
    delayWhen(n => {
      console.log(n)
      return timer(n * 1000)
    })
  )
  .subscribe(console.log)

4.8 taketakeWhiletakeUtil

take:获取数据流中的前几个

import { range } from "rxjs"
import { take } from "rxjs/operators"

range(1, 10).pipe(take(5)).subscribe(console.log)

takeWhile:根据条件从数据源前面开始获取

import { range } from "rxjs"
import { takeWhile } from "rxjs/operators"

range(1, 10)
  .pipe(takeWhile(n => n < 8))
  .subscribe(console.log)

takeUntil:接收可观察对象,当可观察对象发出值时,终止主数据源

import { interval, timer } from "rxjs"
import { takeUntil } from "rxjs/operators"

interval(100)
  .pipe(takeUntil(timer(2000)))
  .subscribe(console.log)
// 结果少两个数据流的原因:第一次和最后一次,都需要延迟 100 毫秒。

4.9 skipskipWhileskipUntil

skip:跳过前几个数据流

import { range } from "rxjs"
import { skip } from "rxjs/operators"

range(1, 10).pipe(skip(5)).subscribe(console.log)

skipWhile:根据条件进行数据流的跳过

import { range } from "rxjs"
import { skipWhile } from "rxjs/operators"

range(1, 10)
  .pipe(skipWhile(n => n < 5))
  .subscribe(console.log)

skipUntil:跳过数据源中前多少时间发出的数据流,发送从这个时间以后数据源中发送的数据流

import { timer, interval } from "rxjs"
import { skipUntil } from "rxjs/operators"

interval(100)
  .pipe(skipUntil(timer(2000)))
  .subscribe(console.log)

4.10 last

获取数据流中的最后一个

import { range } from "rxjs"
import { last } from "rxjs/operators"

range(1, 10).pipe(last()).subscribe(console.log)

如果数据源不变成完成状态,则没有最后一个

import { interval } from "rxjs"
import { last, take } from "rxjs/operators"

interval(1000).pipe(take(5), last()).subscribe(console.log)

4.11 concatAllconcatMap

concatAll:有时 Observable 发出的又是一个 ObervableconcatAll 的作用就是将新的可观察对象和数据源进行合并

import { fromEvent, interval } from "rxjs"
import { map, take, concatAll } from "rxjs/operators"

fromEvent(document, "click")
  .pipe(
    map(event => interval(1000).pipe(take(2))),
    concatAll()
  )
  .subscribe(console.log)
import { map, concatAll } from "rxjs/operators"
import { of, interval } from "rxjs"

interval(1000)
  .pipe(
    map(val => of(val + 10)),
    concatAll()
  )
  .subscribe(console.log)

concatMap:合并可观察对象并处理其发出的数据流

4.12 reducescan

reduce:类似 JavaScript 数组中的 reduce,对数数据进行累计操作。reduce 会等待数据源中的数据流发送完成后再执行,执行时 reduce 内部遍历每一个数据流进行累计操作,操作完成得到结果将结果作为数据流发出

import { interval } from "rxjs"
import { take, reduce } from "rxjs/operators"

interval(500)
  .pipe(
    take(5),
    reduce((acc, value) => acc += value, 0)
  )
  .subscribe(v => console.log())

scan:类似 reduce,进行累计操作,但执行时机不同,数据源每次发出数据流 scan 都会执行。reduce 是发送出最终计算的结果,而 scan 是发出每次计算的结果

import { interval } from "rxjs"
import { take, scan } from "rxjs/operators"

interval(500)
  .pipe(
    take(5),
    scan((acc, value) => acc += value, 0)
  )
  .subscribe(v => console.log())

4.13 mergeAllmergeMap

mergeAll:交叉合并可观察对象

import { fromEvent, interval } from "rxjs"
import { map, mergeAll } from "rxjs/operators"

fromEvent(document, "click")
  .pipe(
    map(() => interval(1000)),
    mergeAll()
  )
  .subscribe(console.log)

mergeMap:交叉合并可观察对象以后对可观察对象发出的数据流进行转换

import { of, interval } from "rxjs"
import { mergeMap, map } from "rxjs/operators"

of("a", "b", "c")
  .pipe(mergeMap(x => interval(1000).pipe(map(i => x + i))))
  .subscribe(x => console.log(x))

4.14 throttleTime

节流,可观察对象高频次向外部发出数据流,通过 throttleTime 限制在规定时间内每次只向订阅者传递一次数据流

import { fromEvent } from "rxjs"
import { throttleTime } from "rxjs/operators"

fromEvent(document, "click")
  .pipe(throttleTime(2000))
  .subscribe(x => console.log(x))

4.15 debounceTime

防抖,触发高频事件,只响应最后一次

import { fromEvent } from "rxjs"
import { debounceTime } from "rxjs/operators"

fromEvent(document, "click")
  .pipe(debounceTime(1000))
  .subscribe(x => console.log(x))

4.16 distinctUntilChanged

检测数据源当前发出的数据流是否和上次发出的相同,如相同,跳过,不相同,发出

import { of } from "rxjs"
import { distinctUntilChanged } from "rxjs/operators"

of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4)
  .pipe(distinctUntilChanged())
  .subscribe(x => console.log(x)) // 1, 2, 1, 2, 3, 4

4.17 groupBy

对数据流进行分组

import { of } from "rxjs"
import { mergeMap, groupBy, toArray } from "rxjs/operators"

of(
  { name: "Sue", age: 25 },
  { name: "Joe", age: 30 },
  { name: "Frank", age: 25 },
  { name: "Sarah", age: 35 }
)
  .pipe(
    groupBy(person => person.age),
    mergeMap(group => group.pipe(toArray()))
  )
  .subscribe(console.log)

// [{name: "Sue", age: 25}, { name: "Frank", age: 25 }]
// [{ name: "Joe", age: 30 }]
// [{ name: "Sarah", age: 35 }]

4.18 withLatestFrom

主数据源发出的数据流总是和支数据源中的最新数据流进行结合,返回数组

import { fromEvent, interval } from "rxjs"
import { withLatestFrom } from "rxjs/operators"

const clicks = fromEvent(document, "click")
const timer = interval(1000)
clicks.pipe(withLatestFrom(timer)).subscribe(console.log)

4.19 switchMap

切换可观察对象

import { fromEvent, interval } from "rxjs"
import { switchMap } from "rxjs/operators"

fromEvent(document, "click")
  .pipe(switchMap(ev => interval(1000)))
  .subscribe(x => console.log(x))

5. 练习

5.1 元素拖拽

<style>
  #box {
    width: 200px;
    height: 200px;
    background: skyblue;
    position: absolute;
    left: 0;
    top: 0;
  }
</style>
<div id="box"></div>
// 原生 JavaScript
box.onmousedown = function (event) {
  let distanceX = event.clientX - event.target.offsetLeft
  let distanceY = event.clientY - event.target.offsetTop
  document.onmousemove = function (event) {
    let positionX = event.clientX - distanceX
    let positionY = event.clientY - distanceY
    box.style.left = positionX + "px"
    box.style.top = positionY + "px"
  }
  box.onmouseup = function () {
    document.onmousemove = null
  }
}
// RxJS
import { fromEvent } from "rxjs"
import { map, switchMap, takeUntil } from "rxjs/operators"

const box = document.getElementById("box")

fromEvent(box, "mousedown")
  .pipe(
    map(event => ({
      distanceX: event.clientX - event.target.offsetLeft,
      distanceY: event.clientY - event.target.offsetTop
    })),
    switchMap(({ distanceX, distanceY }) =>
      fromEvent(document, "mousemove").pipe(
        map(event => ({
          positionX: event.clientX - distanceX,
          positionY: event.clientY - distanceY
        })),
        takeUntil(fromEvent(document, "mouseup"))
      )
    )
  )
  .subscribe(({ positionX, positionY }) => {
    box.style.left = positionX + "px"
    box.style.top = positionY + "px"
  })

5.2 搜索

<input id="search" type="text" placeholder="请输入搜索内容..." />
import { fromEvent, from, throwError } from "rxjs"
import { debounceTime, distinctUntilChanged, map, switchMap, catchError } from "rxjs/operators"
import axios from "axios"

const search = document.getElementById("search")

fromEvent(search, "keyup")
  .pipe(
    debounceTime(700),
    map(event => event.target.value),
    distinctUntilChanged(),
    switchMap(keyword =>
      from(
        axios.get(`https://j1sonplaceholder.typicode.com/posts?q=${keyword}`)
      ).pipe(
        map(response => response.data),
        catchError(error => throwError(`发生了错误: ${error.message}`))
      )
    )
  )
  .subscribe({
    next: value => {
      console.log(value)
    },
    error: error => {
      console.log(error)
    }
  })

5.3 串联请求

先获取token,再根据token获取用户信息

<button id="btn">获取用户信息</button>
import axios from "axios"
import { from, fromEvent } from "rxjs"
import { pluck, concatMap } from "rxjs/operators"

const button = document.getElementById("btn")

fromEvent(button, "click")
  .pipe(
    concatMap(event =>
      from(axios.get("http://localhost:3005/token")).pipe(
        pluck("data", "token")
      )
    ),
    concatMap(token =>
      from(axios.get("http://localhost:3005/userInfo")).pipe(
        pluck("data")
        )
    )
  )
  .subscribe(console.log)
上一篇下一篇

猜你喜欢

热点阅读