让前端飞Web前端之路前端开发

rxjs-observable/observer/subject

2019-05-27  本文已影响28人  bugWriter_y

核心概念1:observable

observable是一个可观察对象,它会产生数据流,并push到观察者

import { Observable } from 'rxjs';
//创建一个可观察对象
let observable=Observable.create(x=>{
    try{
        x.next(1)//发射数据
        x.next(2)//发射数据
        x.next(3)//发射数据
        x.complete()//结束可观察对象,后面的代码将不会执行
        x.next(4)//永远不会发生
    }catch(err){
        x.error(err)//如果发生错误,发射错误数据
    }
})


核心概念2:observer

observer是一个观察者,观察来自observable(可观察对象的数据)。订阅observable就产生了一个observer。有三个参数,第一个是正常数据的执行逻辑,第二个是发生错误后的执行逻辑,第三个是可观察对象结束(complete)后的执行逻辑

//let observer=observable.subscribe(x=>console.log(x))
//订阅可观察对象产生一个观察者
let observer=observable.subscribe(x=>{
    console.log(x)
},err=>{
    console.log(err)
},()=>{
    console.log('complete')
})
  1. 取消订阅
import { Observable } from 'rxjs';
let observable=Observable.create(x=>{
    x.next(1)
    x.next(2)
    setInterval(()=>{//每秒钟发射一个数据
        x.next(1)
    },1000)
})

let observer=observable.subscribe(x=>{
    console.log(`第一个订阅者:${x}`)
})
let observer2=observable.subscribe(x=>{
    console.log(`第二个订阅者:${x}`)
})
setTimeout(()=>{//6秒后结束1订阅者结束订阅
    observer.unsubscribe();
},6000)
  1. 同时取消订阅

    上面的例子只能取消的订阅,2继续订阅数据。如果像同时取消订阅者订阅数据,可以将2加入到1中。

import { Observable } from 'rxjs';
let observable=Observable.create(x=>{
    x.next(1)
    x.next(2)
    setInterval(()=>{//每秒钟发射一个数据
        x.next(1)
    },1000)
})

let observer=observable.subscribe(x=>{
    console.log(`第一个订阅者:${x}`)
})
let observer2=observable.subscribe(x=>{
    console.log(`第二个订阅者:${x}`)
})
//将observer2加入都observer1中。
observer.add(observer2)
setTimeout(()=>{//6秒后结束1订阅者结束订阅,同时2订阅者也结束订阅
    observer.unsubscribe();
},6000)
hot observable/cold observable
  1. cold observable

上面我们创建的observable都是cold observable,它的特点是后续的订阅者能收到可观察对象之前的数据

  1. hot observable

hot observable的特点是只能接收之后的数据。我们可以通过share函数来将cold observable转换成hot observable。当然也有专门的hot observable创建方式,例如fromEvent函数

import { Observable } from 'rxjs';
import {share} from 'rxjs/operators';//导入share操作符
let observable=Observable.create(x=>{
    x.next(1)
    x.next(2)
    setInterval(()=>{
        x.next(1)
    },1000)
}).pipe(share())//调用share函数将cold observable专程hot observable

let observer=observable.subscribe(x=>{
    console.log(`第一个订阅者:${x}`)
})
setTimeOut(()=>{
    let observer2=observable.subscribe(x=>{
        console.log(`第二个订阅者:${x}`)
    })  
    observer.add(observer2)
},3000)
setTimeout(()=>{
    observer.unsubscribe();
},6000)
import {fromEvent} from 'rxjs'//导入fromEvent
let observable=fromEvent(document,"click");//通过fromEvent创建一个页面点击数据源

let observer=observable.subscribe(x=>{
    console.log(`第一个订阅者:${x}`)
})
setTimeout(()=>{
    let observer2=observable.subscribe(x=>{
        console.log(`第二个订阅者:${x}`)
    })  
},3000)

可以看到刚开始点击只会产生一条数据,3秒后2订阅开始结束点击数据,但是之前点击的数据2订阅并接受不到。所以fromEvent产色生的是一个hot observable

核心概念3:subject

subject-主题-是一类特殊的observable。subject和observable的特点就是普通observable创建出来后只能被读取数据(可能描述的不是很好,实际上是push),而subject能写入数据。

import {Subject} from 'rxjs'

const subject=new Subject()
let observer1=subject.subscribe(x=>{
    console.log(`第一个订阅者:${x}`)
})
subject.next(1)
subject.next(2)
let observer2=subject.subscribe(x=>{
    console.log(`第二个订阅者:${x}`)
})
subject.next(3)
observer1.add(observer2)
observer1.unsubscribe()
subject.next(4)

上传例子创建了一个主题,然后订阅1,然后往主题中发射了2个数据,然后订阅2,然后在发射了1个数据,然后将2加入1,然后取消订阅1同时取消订阅2,最后又发射了一个数据。

利用subject做一个搜索的案例

不考虑后台的优化,这里专注于前端。需求是在输入框中输入文字,能检测文字变化然后自动搜索相关信息展示在搜索框下面。要求,尽可能的减少重复请求。

此处设计了操作符的概念(debounceTime,distinctUntilChanged,switchMap),后面会具体展开讲

截图1558845936.png
  1. 后台接口

后台接口这里使用json-server来创建一个简单的restapi

#db.json
{
    "users":[
        {"name":"1"},
        {"name":"12"},
        {"name":"123"},
        {"name":"1234"},
        {"name":"12345"},
        {"name":"123456"},
        {"name":"1234567"},
        {"name":"12345678"}
    ]
}
#cmd
json-server --watch db.json
截图1558846216.png
  1. html
<!--搜索框,只要一输入就调用search方法,并把值传递过去-->
<input type="text" #searchBox (input)="search(searchBox.value)">
<div class='users'>
  <!--使用angular的同步管道,将后台查询到的可观察数据展示出来-->
  <div class="user" *ngFor="let user of users$ | async">
    {{user.name}}
  </div>
</div>
  1. scss
.users{
    padding: 10px;
    background-color: #ddd;
    .user{
        background-color: #fff;
        margin-bottom: 10px;
    }
}
  1. ts
import { Component } from '@angular/core';
import { Subject, Observable } from 'rxjs';
import { switchMap, debounceTime, distinctUntilChanged } from 'rxjs/operators';
import { HttpClient } from '@angular/common/http';
@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss']
})
export class AppComponent {
  subject = new Subject()//创建主题
  users$:Observable<any[]>;//生命查询到的数据,它不是一个传统的list,而是一个obervable,订阅的操作交给了angular的async异步管道
  constructor(private http: HttpClient) {//注入http客户端
    this.users$=this.subject.pipe(//此处订阅主题,并将输入值通过switchMap转换后变成需要的数据
      debounceTime(300),//debounceTime操作标识延迟300毫秒发射数据(当用户连续输入一些文字的时候只会发射最后的一次数据。因为通常用户在输入的时候会一下子输入好多的文字,而只有最后一次他停下来的时候才是需要搜索的关键词。所以此处延迟300毫米。这个值不能太大,太大了延迟影响用户体验,太小了起不到优化的作用)
      distinctUntilChanged(),//distinctUntilChanged操作符只有值改变了才往下发射数据(假设用户快速数输入了123,然后又快速删除了刚刚输入的数据,这个时候查询结果应该是和之前是一样的,所以不需要再次查询后台了。)
      switchMap(x => {//switchMap操作符拦截输入数据字符串,然后将后台查询得到的observable对象发射出去
        return this.http.get<any>(`http://localhost:3000/users?q=${x}`)//具体查询数据使用json-server提供的全文检索q
      })
    )
  }
  search(value) {//此处对用应了html的输入调用方法。这里输入后将数据发射到主题中。有主题的订阅者(构造函数中)去处理具体的逻辑
    this.subject.next(value)
  }
}

[图片上传失败...(image-a30688-1558880758639)]

subject /behaviorSubject/
  1. subject

普通的主题的订阅者只能收到它创建后的数据

import { Subject } from 'rxjs';

const subject=new Subject()
let o1=subject.subscribe(x=>{
    console.log(`1:${x}`)
})
subject.next(1)
let o2=subject.subscribe(x=>{
    console.log(`2:${x}`)
})
subject.next(2)

  1. behaviorSubject

behaviourSubject能收到它创建之前的一个数据。它需要接受一个参数,这个参数作为第一个数据发射进主题,同时指定了数据类型,这里传入0,说明只接受number型

import { BehaviorSubject } from 'rxjs';

const subject=new BehaviorSubject(0)//这里传入0,说明只接受number型,并且发射了第一个数据0
let o1=subject.subscribe(x=>{
    console.log(`1:${x}`)
})
subject.next(1)
let o2=subject.subscribe(x=>{
    console.log(`2:${x}`)
})
subject.next(2)
  1. replaySubject

behaviorSubject能收到创建之前的一个数据,而replaySubject能指定一个数字,表示新创建的订阅者能收到之前的最多n个数据

import { ReplaySubject } from 'rxjs';
const subject=new ReplaySubject(2)//此处我们指定了数字2,代表能收到之前的2个数据
let o1=subject.subscribe(x=>{
    console.log(`1:${x}`)
})
subject.next(1)
subject.next(2)
let o2=subject.subscribe(x=>{
    console.log(`2:${x}`)
})
subject.next(3)

replaySubject创建过程还可以指定第二个参数,表示多少毫秒内的数据

import { ReplaySubject } from 'rxjs';
const subject=new ReplaySubject(10,200)//最大接受10个数据,但是只接受200毫秒内的数据
let i=0
setInterval(()=>{
    subject.next(i++)
},100)
setTimeout(()=>{
    subject.subscribe(x=>{
        console.log(x)
    })
},1000)
  1. asyncSubject

asyncSubject只接受最后一个数据,并且是在主题结束了以后才会发射。

import { AsyncSubject } from 'rxjs';
let subject=new AsyncSubject()
subject.subscribe(x=>{
  console.log(x)
})

subject.next(1)
subject.next(2)
subject.next(3)
subject.next(4)
subject.complete()//只有调用了主题结束了,订阅者才能收到消息,并且只收到最后一个消息
上一篇下一篇

猜你喜欢

热点阅读