Angular-Observable和RxJS

2019-06-13  本文已影响0人  chrisghb

参考文章:介绍RxJS在Angular中的应用

一、可观察对象(Observable)

1.1基本用法和词汇

// 在有消费者订阅它之前,这个订阅者函数并不会实际执行
const locations = new Observable((observer) => {
  const {next, error} = observer;
  let watchId;

  if ('geolocation' in navigator) {
    watchId = navigator.geolocation.watchPosition(next, error);
  } else {
    error('Geolocation not available');
  }

  return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }};
});

// subscribe() 调用会返回一个 Subscription 对象,该对象具有一个 unsubscribe() 方法。
// subscribe()传入一个观察者对象,定义了你收到的这些消息的处理器
const locationsSubscription = locations.subscribe({
  next(position) { console.log('Current Position: ', position); },
  error(msg) { console.log('Error Getting Location: ', msg); }
});

// 10 seconds后调用该方法时,你就会停止接收通知。
setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);

1.2定义观察者observer

通知类型 说明
next 必要。用来处理每个送达值。在开始执行后可能执行零次或多次。
error 可选。用来处理错误通知。错误会中断这个可观察对象实例的执行过程。
complete 可选。用来处理执行完毕(complete)通知。当执行完毕后,这些值就会继续传给下一个处理器。

1.3订阅

// 创建简单的可观察对象,来发送3个值
const myObservable = of(1, 2, 3);

// 创建观察者对象
const myObserver = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

// 订阅
myObservable.subscribe(myObserver);
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification

=>前面指定预定义观察者并订阅它,等同如下写法,省略了next,error,complete
myObservable.subscribe(
  // subscribe() 方法可以接收预定义在观察者中同一行的回调函数
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);

无论哪种情况,next 处理器都是必要的,而 errorcomplete 处理器是可选的。

1.4创建可观察对象

// 订阅者函数会接收一个 Observer 对象,并把值发布给观察者的 next() 方法。
function sequenceSubscriber(observer) {
  // 同步地 发布 1, 2, and 3, 然后 complete
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
 
  // 同步发布数据,所以取消订阅 不需要做任何事情
  return {unsubscribe() {}};
}
 
// 使用 Observable 构造函数,创建一个新的可观察对象,
// 当执行可观察对象的 subscribe() 方法时,这个构造函数就会把它接收到的参数sequenceSubscriber作为订阅者函数来运行。 
const sequence = new Observable(sequenceSubscriber);
 
sequence.subscribe({
  next(num) { console.log(num); },
  complete() { console.log('Finished sequence'); }
});
 
// Logs:
// 1
// 2
// 3
// Finished sequence
function fromEvent(target, eventName) {
    return new Observable(
        // new Observable中传入的订阅者函数是用内联方式定义的
        // 订阅者函数会接收一个 观察者对象observer,并把值e发布给观察者的 next() 方法
        (observer) => {
            const handler = (e) => observer.next(e);

            // Add the event handler to the target
            target.addEventListener(eventName, handler);

            return () => {
                // Detach the event handler from the target
                target.removeEventListener(eventName, handler);
            };
        }

    );
}


const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;

const subscription = fromEvent(nameInput, 'keydown')//使用fromEvent函数来创建可发布 keydown 事件的可观察对象
    .subscribe(
        // subscribe() 方法接收预定义在观察者中同一行的next回调函数
        (e: KeyboardEvent) => {
            if (e.keyCode === ESC_KEY) {
                nameInput.value = '';
            }
        }
    );

1.5多播?

1.6错误处理

myObservable.subscribe({
  next: (num) => console.log('Next num: ' + num),
  error: (err) => console.log('Received an errror: ' + err)
});

二、RxJS 库

RxJS是一个使用可观察对象进行响应式编程的

2.1创建可观察对象的函数

RxJS 提供了一些用来创建可观察对象函数。这些函数可以简化根据某些东西创建可观察对象的过程,比如承诺、定时器、事件、ajax等等。

import { fromPromise } from 'rxjs';

// Create an Observable out of a promise
const data = fromPromise(fetch('/api/endpoint'));
// Subscribe to begin listening for async result
data.subscribe({
 next(response) { console.log(response); },
 error(err) { console.error('Error: ' + err); },
 complete() { console.log('Completed'); }
});
import { interval } from 'rxjs';

// Create an Observable that will publish a value on an interval
const secondsCounter = interval(1000);
// Subscribe to begin publishing values
secondsCounter.subscribe(n =>
  console.log(`It's been ${n} seconds since subscribing!`));
import { fromEvent } from 'rxjs';

const el = document.getElementById('my-element');

// Create an Observable that will publish mouse movements
const mouseMoves = fromEvent(el, 'mousemove');

// Subscribe to start listening for mouse-move events
const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
  // Log coords of mouse movements
  console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);

  // When the mouse is over the upper-left of the screen,
  // unsubscribe to stop listening for mouse movements
  if (evt.clientX < 40 && evt.clientY < 40) {
    subscription.unsubscribe();
  }
});
import { ajax } from 'rxjs/ajax';

// Create an Observable that will create an AJAX request
const apiData = ajax('/api/data');
// Subscribe to create the request
apiData.subscribe(res => console.log(res.status, res.response));

2.2常用操作符

操作符会观察来源可观察对象中发出的转换它们,并返回由转换后的值组成的新的可观察对象

Observable.fromEvent(node, 'input')
  .map((event: any) => event.target.value)
  .filter(value => value.length >= 2)
  .subscribe(value => { console.log(value); });

下面是整个顺序步骤:

  1. 假设用户输入:a

  2. Observable对触发 oninput 事件作出反应,将值以参数的形式传递给observernext()。(内部实现)

  3. map() 根据 event.target.value 的内容返回一个新的 Observable,并调用 next() 传递给下一个observer

  4. filter() 如果值长度 >=2 的话,则返回一个新的 Observable,并调用 next() 传递给下一个observer

  5. 最后,将结果传递给 subscribe 订阅块。

只要记住每一次 operator 都会返回一个Observable,不管 operator 有多少个,最终只有最后一个 Observable 会被订阅

import { filter, map } from 'rxjs/operators';

const squareOdd = of(1, 2, 3, 4, 5) // 可观察对象
  .pipe(
    filter(n => n % 2 !== 0),
    map(n => n * n)
  );

// Subscribe to get values
squareOdd.subscribe(x => console.log(x));

如果组件有多个订阅者的话,我们需要将这些订阅者存储在数组中,当组件被销毁时再逐个取消订阅。但,我们有更好的办法:
使用 takeWhile() operator,它会在你传递一个布尔值是调用 next() 还是 complete()

private alive: boolean = true;
ngOnInit() {
  const node = document.querySelector('input[type=text]');

  this.s = Observable.fromEvent(node, 'input')
    .takeWhile(() => this.alive)
    .map((event: any) => event.target.value)
    .filter(value => value.length >= 2)
    .subscribe(value => { console.log(value) });
}

ngOnDestroy() {
  this.alive = false;
}

RxJS很火很大原因我认还是提供了丰富的API,以下是摘抄:

创建数据流:

转换操作:

组合数据流:

窃听:

操作符参考资料
Rxjs 常用操作符

2.3错误处理

除了可以在订阅时提供 error() 处理器外,RxJS 还提供了 catchError 操作符,它允许你在管道中处理已知错误。
下面是使用 catchError 操作符实现这种效果的例子:

import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('/api/data').pipe(
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  //如果你捕获这个错误并提供了一个默认值,流就会继续处理这些值,而不会报错。
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});

2.4重试失败的可观察对象

可以在 catchError 之前使用 retry 操作符。
下列代码为前面的例子加上了捕获错误前重发请求的逻辑:

import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';

const apiData = ajax('/api/data').pipe(
  retry(3), // Retry up to 3 times before failing
  map(res => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});

2.5可观察对象的命名约定

习惯上的可观察对象的名字以$符号结尾。

stopwatchValue$: Observable<number>;

三、Angular 中的可观察对象

Angular 使用可观察对象作为处理各种常用异步操作的接口。比如:

3.1事件发送器 EventEmitter

Angular 提供了一个 EventEmitter 类,它用来从组件的 @Output() 属性中发布一些值。EventEmitter 扩展Observable,并添加了一个 emit() 方法,这样它就可以发送任意值了。当你调用 emit() 时,就会把所发送的值传给订阅上来的观察者的 next() 方法。

@Output() changed = new EventEmitter<string>();

click() {
    this.changed.emit('hi~');
}
@Component({
  template: `<comp (changed)="subscribe($event)"></comp>`
})
export class HomeComponent {
  subscribe(message: string) {
     // 接收:hi~
  }
}

3.2HTTP

AngularHttpClientHTTP 方法调用中返回可观察对象。例如,http.get(‘/api’) 就会返回可观察对象。

相对于基于承诺(Promise)的 HTTP API,它有一系列优点:

3.3Async 管道

AsyncPipe 会订阅一个可观察对象或承诺,并返回其发出的最后一个值。当发出新值时,该管道就会把这个组件标记为需要进行变更检查的

3.4路由器 (router)

import { Router, NavigationStart } from '@angular/router';
import { filter } from 'rxjs/operators';

@Component({
  selector: 'app-routable',
  templateUrl: './routable.component.html',
  styleUrls: ['./routable.component.css']
})
export class Routable1Component implements OnInit {

  navStart: Observable<NavigationStart>;

  constructor(private router: Router) {
    // Create a new Observable the publishes only the NavigationStart event
    this.navStart = router.events.pipe(
      filter(evt => evt instanceof NavigationStart)
    ) as Observable<NavigationStart>;
  }

  ngOnInit() {
    this.navStart.subscribe(evt => console.log('Navigation Started!'));
  }
}
import { ActivatedRoute } from '@angular/router';

@Component({
  selector: 'app-routable',
  templateUrl: './routable.component.html',
  styleUrls: ['./routable.component.css']
})
export class Routable2Component implements OnInit {
  constructor(private activatedRoute: ActivatedRoute) {}

  ngOnInit() {
    this.activatedRoute.url
      .subscribe(url => console.log('The URL changed to: ' + url));
  }
}

3.5响应式表单 (reactive forms)

响应式表单具有一些属性,它们使用可观察对象来监听表单控件的值。 FormControlvalueChanges 属性和 statusChanges 属性包含了会发出变更事件可观察对象。订阅可观察的表单控件属性是在组件类中触发应用逻辑的途径之一。比如:

import { FormGroup } from '@angular/forms';

@Component({
  selector: 'my-component',
  template: 'MyComponent Template'
})
export class MyComponent implements OnInit {
  nameChangeLog: string[] = [];
  heroForm: FormGroup;

  ngOnInit() {
    this.logNameChange();
  }
  logNameChange() {
    const nameControl = this.heroForm.get('name');
    nameControl.valueChanges.subscribe(
      (value: string) => this.nameChangeLog.push(value)
    );
  }
}

四、可观察对象与其它技术的比较

4.1可观察对象 vs. 承诺

可观察对象 承诺 Observable优势
可观察对象是声明式的,在被订阅之前,它不会开始执行。 承诺是在创建时就立即执行的。 这让可观察对象可用于定义那些应该按需执行的情景。
可观察对象能提供多个值 承诺只提供一个 这让可观察对象可用于随着时间的推移获取多个值。
可观察对象会区分串联处理和订阅语句。 承诺只有 .then() 语句。 这让可观察对象可用于创建供系统的其它部分使用而不希望立即执行的复杂菜谱。
可观察对象的 subscribe() 会负责处理错误。 承诺会把错误推送给它的子承诺 这让可观察对象可用于进行集中式、可预测的错误处理。

4.2创建与订阅

content_copy
// declare a publishing operation
new Observable((observer) => { subscriber_fn });
// initiate execution
observable.subscribe(() => {
      // observer handles notifications
    });
content_copy
// initiate execution
new Promise((resolve, reject) => { executer_fn });
// handle return value
promise.then((value) => {
      // handle result here
    });

4.3串联

content_copy
observable.map((v) => 2*v);
content_copy
promise.then((v) => 2*v);

4.4可取消

content_copy
const sub = obs.subscribe(...);
sub.unsubscribe();

4.5错误处理

content_copy
obs.subscribe(() => {
  throw Error('my error');
});
content_copy
promise.then(() => {
      throw Error('my error');
});

4.6速查表

4.7可观察对象 vs. 事件 API

4.8可观察对象 vs. 数组

五、Subject

我们在写一个Service用于数据传递时,总是使用 new Subject

@Injectable()
export class MessageService {
    private subject = new Subject<any>();

    send(message: any) {
        this.subject.next(message);
    }

    get(): Observable<any> {
        return this.subject.asObservable();
    }
}

F组件需要向M组件传递数据时,我们可以在F组件中使用 send()

constructor(public srv: MessageService) { }

ngOnInit() {
    this.srv.send('w s k f m?')
}

M组件只需要订阅内容就行:

constructor(private srv: MessageService) {}

message: any;
ngOnInit() {
    this.srv.get().subscribe((result) => {
        this.message = result;
    })
}
上一篇 下一篇

猜你喜欢

热点阅读