rxjs 里的pipe operator

2021-02-07  本文已影响0人  _扫地僧_

源代码:

const a = of([1, 2, 3]);
    const b = map((data: number[]) => {
      for( let i = 0; i < data.length; i++){
        data[i] = data[i] + 1;
      }
      console.log('data: ' + data);
      return data;
    } );

    const c = a.pipe(b);
    c.subscribe((data) => console.log('Fairy:' + data));

注意,传入pipe的operations不是应用程序传入的fn,而是执行了map operator后,被MapOperation包裹过的fn:

this还是指向Observable对象。

因为我只传入了一个operation,所以不会调用array的reduce,而是直接返回数组里唯一的一个元素:

pipeFromArray(operations)返回operations数组里唯一的元素,即mapOperations,然后将this即当前调用pipe的Observable对象传入mapOperations函数:

此时source就是调用pipe函数的Observable对象:

调用Observable对象的lift方法,输入为新建的MapOperator:

这个MapOperator其实就是一个POJO - Plain Old JavaScript Object,就是回调project和this上下文的封装。

Observable对象的lift操作也很好理解:新建一个Observable对象,将其source指向原始Observable,然后operator设置为刚才新建的MapOperator:

注意,此时我们执行subscribe的Observable对象,实际上是pipe操作返回的新Observable:

其map operator里包含了map应该执行的回调函数:

source指向的是原始的Observable对象。

Observable.subscribe里第一个重要的操作,就是创建subscriber对象。既然是subscribe,就得有subscribe对象,这个对象就是应用程序传入的callback的封装。

至此我们已经认识了一些对象的封装了:

MapOperation和MapOperator是map操作符里传入的function(称为project)的封装,其中MapOperator包含project和thisArg,而MapOperation内部的逻辑有二:一是确保project的类型为function,二是创建MapOperator,然后调用Observable的lift生成新的Observable.

Subscriber是应用程序传入的callback的封装。

Subscriber的destination指向了SafeSubscriber,后者包含的app callback:

继续回到Observable的subscribe操作,下图的含义是,如果新的Observable对象包含mapOperator(在我们的例子里确实包含),则调用该MapOperator,同时传入的输入参数为原始的Observable对象。

这里看到MapOperator的call方法了:

把subscribe调用链式传递给source了,这也是为什么MapOperation里为什么要调用原始Observable的lift方法——建立Observable与Observable之间的链接关系。

MapSubscriber和普通的Subscriber有何区别?上图做了对比,就多了一个project字段。

原始对象(即ofType返回的Observable)调用trySubscribe:

sink即MapSubscriber,里面包含了app callback:

以及map project:

有了这些信息,可以执行map操作了。

始终记住这个语义:Observable.subscribe(subscriber)

更多Jerry的原创文章,尽在:"汪子熙":


上一篇下一篇

猜你喜欢

热点阅读