rxjs中常用的操作符

2018-12-12  本文已影响0人  nzjcnjzx

Rx.Observable.forkJoin(...args [resultSelector])

并行运行所有可观察序列并收集它们的最后元素。

  var source = Rx.Observable.forkJoin(
      Rx.Observable.of(1, 2, 3),
      Rx.Observable.range(0, 10),
    );
    var subscription = source.subscribe(
      x => console.log(`onNext: ${x}`),
      e => console.log(`onError: ${e}`),
      () => console.log('onCompleted'));
   // onNext: 3,9 
   // onCompleted 

Rx.Observable.of(...args)

将参数转换为可观察序列。

var source = Rx.Observable.of(1,2,3);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });

// => Next: 1
// => Next: 2
// => Next: 3
// => Completed

Rx.Observable.range(start, count, [scheduler])

使用指定的调度程序发出观察者消息,在指定范围内生成可观察的整数数列。

var source = Rx.Observable.range(0, 3);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });

// => Next: 0 
// => Next: 1
// => Next: 2 
// => Completed 

Rx.Observable.from(iterable, [mapFn], [thisArg], [scheduler])

此方法从类似数组或可迭代对象创建新的Observable序列。

Rx.Observable.from([1, 2, 3], x => x + x).subscribe(
  x => console.log(`onNext: ${x}`),
  e => console.log(`onError: ${e}`),
  () => console.log('onCompleted'));

// => onNext: 2
// => onNext: 4
// => onNext: 6
// => onCompleted

Rx.Observable.concat(...args)

只要先前的可观察序列成功终止,就连接所有指定的可观察序列。

/* Using Observable sequences */
var source1 = Rx.Observable.return(42);
var source2 = Rx.Observable.return(56);

var source = Rx.Observable.concat(source1, source2);

var subscription = source.subscribe(
  x => console.log(`onNext: ${x}`),
  e => console.log(`onError: ${e}`),
  () => console.log('onCompleted'));

// => onNext: 42
// => onNext: 56
// => onCompleted

Rx.Observable.merge([scheduler], ...args)

*将所有可观察序列合并为单个可观察序列。
*调度程序是可选的,如果未指定,则使用立即调度程序。

var source1 = Rx.Observable.interval(100)
    .timeInterval()
    .pluck('interval');
var source2 = Rx.Observable.interval(150)
    .timeInterval()
    .pluck('interval');

var source = Rx.Observable.merge(
    source1,
    source2).take(10);


var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });

// => Next: 100
// => Next: 150
// => Next: 100
// => Next: 150
// => Next: 100 
// => Completed

Rx.Observable.create(subscribe)

从指定的订阅方法实现创建可观察序列。这是该createWithDisposable方法的别名

/* Using a function */
var source = Rx.Observable.create(observer => {
    observer.onNext(42);
    observer.onCompleted();

    // Note that this is optional, you do not have to return this if you require no cleanup
    return () => console.log('disposed')
});

var subscription = source.subscribe(
  x => console.log(`onNext: ${x}`),
  e => console.log(`onError: ${e}`),
  () => console.log('onCompleted'));

// => onNext: 42
// => onCompleted

subscription.dispose();

// => disposed

Rx.Observable.interval(period, [scheduler])

返回一个可观察的序列,该序列在每个句点后生成一个值。

var source = Rx.Observable
    .interval(500 /* ms */)
    .timeInterval()
    .take(3);

var subscription = source.subscribe(
    function (x) {
        console.log('Next:', x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });

// => Next: {value: 0, interval: 500}
// => Next: {value: 1, interval: 500}
// => Next: {value: 2, interval: 500} 
// => Completed

Rx.Observable.start(func, [scheduler], [context])

在指定的调度程序上异步调用指定的函数,通过可观察的序列显示结果。

var context = { value: 42 };

var source = Rx.Observable.start(
    function () {
        return this.value; 
    }, 
    Rx.Scheduler.timeout, 
    context
);

var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });

// => Next: 42 
// => Completed 

Rx.Observable.prototype.timeInterval([scheduler])

记录可观察序列中连续值之间的时间间隔。

var source = Rx.Observable.timer(0, 1000)
    .timeInterval()
    .map(function (x) { return x.value + ':' + x.interval; })
    .take(5);
    
var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });

// => Next: 0:0
// => Next: 1:1000
// => Next: 2:1000
// => Next: 3:1000
// => Next: 4:1000
// => Completed

Rx.Observable.timer(dueTime, [period], [scheduler])

上一篇下一篇

猜你喜欢

热点阅读