深入浅出Rxjs笔记 四

2019-10-09  本文已影响0人  月半女那

合并数据流

将多个数据流中的数据会和到一个数据流中,途中只展示了两个上游数据流

需求 操作符
将多个数据流以首位项链方式合并 concat / concatAll
将多个数据流中的数据以先到先得方式合并 merge / mergeAll
将多个数据流中的数据以一一对应的方式合并 zip/ zipAll
持续合并多个数据流中最新产生的数据 combineLatest/ combineAll / withLatestFrom
从多个数据流中选取第一个产生内容的数据流 race
在数据流前面添加一个指定数据 startWith
从高阶数据流中切换数据源 switch / exhaust

1.concat:首尾相连

因为concat开始从下一个Observable对象抽取数据只能在前一个Observable对象完结之后,所以参与到这个concat之中的Observable对象应该都是完结的,如果Observable对象永不完结,那排在后面的Observable对象永远没有上场的机会

import { of, concat ,range } from 'rxjs';
const data = of(1, 2, 3);
const data2 = of(4, 5, 6);
concat(data, data2).subscribe(
    value => console.log(value),
    err => { },
    () => console.log('...and it is done!')
)

// 1
// 2
// 3
// 4
// 5
// 6
// ...and it is done!
concat(range(1,2),range(30,3)).subscribe(
    value => console.log(value),
    err => { },
    () => console.log('range...and it is done!')
)
// 1
// 2
// 30
// 31
// 32
// range...and it is done!

2.merge:先到先得

merge会第一时间订阅所有的上游Observable,然后对上游的数据采取‘先到先得’的策略。任何一个Observable只要数据推下来,就会立刻转给下游Observable对象
如果上游某个Observable对象不能完结,并不影响其他Observable对象的数据传给merge的下游,merge只有在上游数据d都完结的情况下才会完结自己产生Observable对象。
一般用来合并异步数据流,如果是同步的效果和concat相似,

// 异步
import { merge, interval , of  } from 'rxjs';
import { map } from 'rxjs/operators';
const timer1 = interval(1000).pipe(map(x => x * 2 + 2));
const timer2 = interval(1000).pipe(map(x => x * 3 + 3 ));
merge(timer1, timer2).subscribe(
    value => console.log(value),
    err => { },
    () => console.log('...and it is done!')
)
//2
//3
//4
//.....
const timer3 = interval(500).pipe(map(x => x * 11 +'c' ));
merge(timer1, timer2,timer3).subscribe(
    value => console.log(value),
    err => { },
    () => console.log('...and it is done!')
)
// 0c
// 0a
// 0b
// 11c
// 22c

// 同步
const of1 = of(1, 2, 3);
const of2 = of(4, 5, 6);
merge(of1, of2).subscribe(
    value => console.log(value),
    err => { },
    () => console.log('...and it is done!')
)
// 1
// 2
// 3
// 4
// 5
// 6
// ...and it is done!

3.zip:拉链式组合

一对一的合并
zip和merge/concat不同,zip会将上游的数据转化成数组形式,在zip执行时,他会把上游的数据转化为数组形式,如果对应的一个Observable么有吐出数据,则会一直等待,直到吐出数据,然后将这两个数据合并,传递给下游,
由于a3是一个定时器,所以a1,a2会一直等待,知道a3吐出数据,会造成数据挤压的问题,对于数据量比较小的Observable对象,还是可以忍受的,但是一旦数据量变大,就需要考虑潜在的内存压力问题

import { zip, of ,interval} from 'rxjs';
import {map} from 'rxjs/operators'
const a1 = of(1,2,3);
const a2 = of('a','b','c');
const a3 = interval(1000).pipe(map(x => x +'true'))
zip(a1,a2,a3).subscribe(x => console.log(x))
// [ 1, 'a', '0true' ]
// [ 2, 'b', '1true' ]
// [ 3, 'c', '2true' ]

4. combineLatest:合并最后一个数据

import { combineLatest, timer, of } from 'rxjs';
import { map } from 'rxjs/operators';
const firsttimer = timer(0, 100);
const secondtimer = timer(300, 500);
const combine = combineLatest(firsttimer, secondtimer);
combine.subscribe(value => console.log(value))
// [ 2, 0 ]
// [ 3, 0 ]
// [ 4, 0 ]
// [ 5, 0 ]
// [ 6, 0 ]
// [ 7, 0 ]
// [ 7, 1 ]
// [ 8, 1 ]
// [ 9, 1 ]

const a = of(1,2,3);
const b = of(4,5,6);
const c = combineLatest(a,b);
c.subscribe(value => console.log(value))
// [ 3, 4 ]
// [ 3, 5 ]
// [ 3, 6 ]

// 多重依赖问题
const source1 = timer(0,100);
const d = source1.pipe(map( x => x * 10))
const e = source1.pipe(map( x => x * 20))
const f = combineLatest(d ,e);
f.subscribe(value => console.log(value))
// [ 0, 0 ]
// [ 10, 0 ]
// [ 10, 20 ]
// [ 20, 20 ]

5. withLatestFrom

withLatestFrom给下游推送数据只能有一个上游Observable对象驱动。由于concat,merge,zip和combineLatest作为输入的Observable对象地位都是对等的,但是withLastetFrom却不是这样的,调用withlatestForm的Observable对象有主导数据产生节奏的作用,而作为参数的Observable对象只能贡献数据,不能控制产生数据的时机.

import { withLatestFrom, map } from 'rxjs/operators';
import { timer, of } from 'rxjs';
const a = timer(0, 100);
const c = a.pipe(map(x => x ));
const d = a.pipe(withLatestFrom(c));
d.subscribe(value => console.log(value))
// [ 0, 0 ]
// [ 1, 1 ]
// [ 2, 2 ]
// [ 3, 3 ]

const e = of(1, 2, 3);
const f = of(4, 5, 6);
e.pipe(withLatestFrom(f)).subscribe(value => console.log(value))
// [ 1, 6 ]
// [ 2, 6 ]
// [ 3, 6 ]

如果要合并完全独立的Observable对象,使用combineLatest,当要求吧一个Observable对象映射成新的数据流,并且从其他Observable对象获取最新数据就使用withLatestFrom

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Document</title>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.12/Rx.min.js"></script>
</head>

<body>
    <div id='test'></div>
</body>
<script>
    // import { fromEvent } from 'rxjs';
    // import { combineLatest, timer, of } from 'rxjs';
    const clicks = Rx.Observable.fromEvent(document, 'click');
    const x = clicks.map(e => e.x);
    const y = clicks.map(e => e.y);
    const res = x.combineLatest(y, (x,y) => `combineLatest--x:${x}--y:${y}`);
    res.subscribe(value => {
        console.log(value);
        document.getElementById('test').innerText += value;
    })

    x.withLatestFrom(y, (x,y) => `withLatestFrom--x:${x}--y:${y}`).subscribe(value => {
        console.log(value);
        document.getElementById('test').innerText += value;
    });
<!-- combineLatest--x:782--y:383
index.html:28 withLatestFrom--x:782--y:383
index.html:23 combineLatest--x:646--y:383
index.html:23 combineLatest--x:646--y:341
index.html:28 withLatestFrom--x:646--y:341 -->
</script>

</html>

上一篇下一篇

猜你喜欢

热点阅读