RxJS 库

The RxJS library

响应式编程是一种面向数据流和变更传播的异步编程范式(Wikipedia)。RxJS(响应式扩展的 JavaScript 版)是一个使用可观察对象进行响应式编程的库,它让组合异步代码和基于回调的代码变得更简单 (RxJS Docs)。

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change (Wikipedia). RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using observables that makes it easier to compose asynchronous or callback-based code (RxJS Docs).

RxJS 提供了一种对 Observable 类型的实现,直到 Observable 成为了 JavaScript 语言的一部分并且浏览器支持它之前,它都是必要的。这个库还提供了一些工具函数,用于创建和使用可观察对象。这些工具函数可用于:

RxJS provides an implementation of the Observable type, which is needed until the type becomes part of the language and until browsers support it. The library also provides utility functions for creating and working with observables. These utility functions can be used for:

  • 把现有的异步代码转换成可观察对象

    Converting existing code for async operations into observables

  • 迭代流中的各个值

    Iterating through the values in a stream

  • 把这些值映射成其它类型

    Mapping values to different types

  • 对流进行过滤

    Filtering streams

  • 组合多个流

    Composing multiple streams

创建可观察对象的函数

Observable creation functions

RxJS 提供了一些用来创建可观察对象的函数。这些函数可以简化根据某些东西创建可观察对象的过程,比如事件、定时器、承诺等等。比如:

RxJS offers a number of functions that can be used to create new observables. These functions can simplify the process of creating observables from things such as events, timers, promises, and so on. For example:

import { fromPromise } from 'rxjs'; // Create an Observable out of a promise const data = fromPromise(fetch('/api/endpoint')); // Subscribe to begin listening for async result data.subscribe({ next(response) { console.log(response); }, error(err) { console.error('Error: ' + err); }, complete() { console.log('Completed'); } });
Create an observable from a promise
      
      import { fromPromise } from 'rxjs';

// Create an Observable out of a promise
const data = fromPromise(fetch('/api/endpoint'));
// Subscribe to begin listening for async result
data.subscribe({
 next(response) { console.log(response); },
 error(err) { console.error('Error: ' + err); },
 complete() { console.log('Completed'); }
});
    
import { interval } from 'rxjs'; // Create an Observable that will publish a value on an interval const secondsCounter = interval(1000); // Subscribe to begin publishing values secondsCounter.subscribe(n => console.log(`It's been ${n} seconds since subscribing!`));
Create an observable from a counter
      
      import { interval } from 'rxjs';

// Create an Observable that will publish a value on an interval
const secondsCounter = interval(1000);
// Subscribe to begin publishing values
secondsCounter.subscribe(n =>
  console.log(`It's been ${n} seconds since subscribing!`));
    
import { fromEvent } from 'rxjs'; const el = document.getElementById('my-element'); // Create an Observable that will publish mouse movements const mouseMoves = fromEvent(el, 'mousemove'); // Subscribe to start listening for mouse-move events const subscription = mouseMoves.subscribe((evt: MouseEvent) => { // Log coords of mouse movements console.log(`Coords: ${evt.clientX} X ${evt.clientY}`); // When the mouse is over the upper-left of the screen, // unsubscribe to stop listening for mouse movements if (evt.clientX < 40 && evt.clientY < 40) { subscription.unsubscribe(); } });
Create an observable from an event
      
      
  1. import { fromEvent } from 'rxjs';
  2.  
  3. const el = document.getElementById('my-element');
  4.  
  5. // Create an Observable that will publish mouse movements
  6. const mouseMoves = fromEvent(el, 'mousemove');
  7.  
  8. // Subscribe to start listening for mouse-move events
  9. const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
  10. // Log coords of mouse movements
  11. console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);
  12.  
  13. // When the mouse is over the upper-left of the screen,
  14. // unsubscribe to stop listening for mouse movements
  15. if (evt.clientX < 40 && evt.clientY < 40) {
  16. subscription.unsubscribe();
  17. }
  18. });
import { ajax } from 'rxjs/ajax'; // Create an Observable that will create an AJAX request const apiData = ajax('/api/data'); // Subscribe to create the request apiData.subscribe(res => console.log(res.status, res.response));
Create an observable that creates an AJAX request
      
      import { ajax } from 'rxjs/ajax';

// Create an Observable that will create an AJAX request
const apiData = ajax('/api/data');
// Subscribe to create the request
apiData.subscribe(res => console.log(res.status, res.response));
    

操作符

Operators

操作符是基于可观察对象构建的一些对集合进行复杂操作的函数。RxJS 定义了一些操作符,比如 map()filter()concat()flatMap()

Operators are functions that build on the observables foundation to enable sophisticated manipulation of collections. For example, RxJS defines operators such as map(), filter(), concat(), and flatMap().

操作符接受一些配置项,然后返回一个以来源可观察对象为参数的函数。当执行这个返回的函数时,这个操作符会观察来源可观察对象中发出的值,转换它们,并返回由转换后的值组成的新的可观察对象。下面是一个简单的例子:

Operators take configuration options, and they return a function that takes a source observable. When executing this returned function, the operator observes the source observable’s emitted values, transforms them, and returns a new observable of those transformed values. Here is a simple example:

import { map } from 'rxjs/operators'; const nums = of(1, 2, 3); const squareValues = map((val: number) => val * val); const squaredNums = squareValues(nums); squaredNums.subscribe(x => console.log(x)); // Logs // 1 // 4 // 9
Map operator
      
      
  1. import { map } from 'rxjs/operators';
  2.  
  3. const nums = of(1, 2, 3);
  4.  
  5. const squareValues = map((val: number) => val * val);
  6. const squaredNums = squareValues(nums);
  7.  
  8. squaredNums.subscribe(x => console.log(x));
  9.  
  10. // Logs
  11. // 1
  12. // 4
  13. // 9

你可以使用管道来把这些操作符链接起来。管道让你可以把多个由操作符返回的函数组合成一个。pipe() 函数以你要组合的这些函数作为参数,并且返回一个新的函数,当执行这个新函数时,就会顺序执行那些被组合进去的函数。

You can use pipes to link operators together. Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.

应用于某个可观察对象上的一组操作符就像一个菜谱 —— 也就是说,对你感兴趣的这些值进行处理的一组操作步骤。这个菜谱本身不会做任何事。你需要调用 subscribe() 来通过这个菜谱生成一个结果。

A set of operators applied to an observable is a recipe—that is, a set of instructions for producing the values you’re interested in. By itself, the recipe doesn’t do anything. You need to call subscribe() to produce a result through the recipe.

例子如下:

Here’s an example:

import { filter, map } from 'rxjs/operators'; const nums = of(1, 2, 3, 4, 5); // Create a function that accepts an Observable. const squareOddVals = pipe( filter((n: number) => n % 2 !== 0), map(n => n * n) ); // Create an Observable that will run the filter and map functions const squareOdd = squareOddVals(nums); // Suscribe to run the combined functions squareOdd.subscribe(x => console.log(x));
Standalone pipe function
      
      
  1. import { filter, map } from 'rxjs/operators';
  2.  
  3. const nums = of(1, 2, 3, 4, 5);
  4.  
  5. // Create a function that accepts an Observable.
  6. const squareOddVals = pipe(
  7. filter((n: number) => n % 2 !== 0),
  8. map(n => n * n)
  9. );
  10.  
  11. // Create an Observable that will run the filter and map functions
  12. const squareOdd = squareOddVals(nums);
  13.  
  14. // Suscribe to run the combined functions
  15. squareOdd.subscribe(x => console.log(x));

pipe() 函数也同时是 RxJS 的 Observable 上的一个方法,所以你可以用下列简写形式来达到同样的效果:

The pipe() function is also a method on the RxJS Observable, so you use this shorter form to define the same operation:

import { filter, map } from 'rxjs/operators'; const squareOdd = of(1, 2, 3, 4, 5) .pipe( filter(n => n % 2 !== 0), map(n => n * n) ); // Subscribe to get values squareOdd.subscribe(x => console.log(x));
Observable.pipe function
      
      import { filter, map } from 'rxjs/operators';

const squareOdd = of(1, 2, 3, 4, 5)
  .pipe(
    filter(n => n % 2 !== 0),
    map(n => n * n)
  );

// Subscribe to get values
squareOdd.subscribe(x => console.log(x));
    

常用操作符

Common operators

RxJS 提供了很多操作符,不过只有少数是常用的。 下面是一个常用操作符的列表和用法范例,参见 RxJS API 文档

RxJS provides many operators, but only a handful are used frequently. For a list of operators and usage samples, visit the RxJS API Documentation.

注意,对于 Angular 应用来说,我们提倡使用管道来组合操作符,而不是使用链式写法。链式写法仍然在很多 RxJS 中使用着。

Note that, for Angular apps, we prefer combining operators with pipes, rather than chaining. Chaining is used in many RxJS examples.

类别

Area

操作

Operators

创建

Creation

from, fromPromise,fromEvent, of

组合

Combination

combineLatest, concat, merge, startWith , withLatestFrom, zip

过滤

Filtering

debounceTime, distinctUntilChanged, filter, take, takeUntil

转换

Transformation

bufferTime, concatMap, map, mergeMap, scan, switchMap

工具

Utility

tap

多播

Multicasting

share

错误处理

Error handling

除了可以在订阅时提供 error() 处理器外,RxJS 还提供了 catchError 操作符,它允许你在管道中处理已知错误。

In addition to the error() handler that you provide on subscription, RxJS provides the catchError operator that lets you handle known errors in the observable recipe.

假设你有一个可观察对象,它发起 API 请求,然后对服务器返回的响应进行映射。如果服务器返回了错误或值不存在,就会生成一个错误。如果你捕获这个错误并提供了一个默认值,流就会继续处理这些值,而不会报错。

For instance, suppose you have an observable that makes an API request and maps to the response from the server. If the server returns an error or the value doesn’t exist, an error is produced. If you catch this error and supply a default value, your stream continues to process values rather than erroring out.

下面是使用 catchError 操作符实现这种效果的例子:

Here's an example of using the catchError operator to do this:

import { ajax } from 'rxjs/ajax'; import { map, catchError } from 'rxjs/operators'; // Return "response" from the API. If an error happens, // return an empty array. const apiData = ajax('/api/data').pipe( map(res => { if (!res.response) { throw new Error('Value expected!'); } return res.response; }), catchError(err => of([])) ); apiData.subscribe({ next(x) { console.log('data: ', x); }, error(err) { console.log('errors already caught... will not run'); } });
catchError operator
      
      
  1. import { ajax } from 'rxjs/ajax';
  2. import { map, catchError } from 'rxjs/operators';
  3. // Return "response" from the API. If an error happens,
  4. // return an empty array.
  5. const apiData = ajax('/api/data').pipe(
  6. map(res => {
  7. if (!res.response) {
  8. throw new Error('Value expected!');
  9. }
  10. return res.response;
  11. }),
  12. catchError(err => of([]))
  13. );
  14.  
  15. apiData.subscribe({
  16. next(x) { console.log('data: ', x); },
  17. error(err) { console.log('errors already caught... will not run'); }
  18. });

重试失败的可观察对象

Retry failed observable

catchError 提供了一种简单的方式进行恢复,而 retry 操作符让你可以尝试失败的请求。

Where the catchError operator provides a simple path of recovery, the retry operator lets you retry a failed request.

可以在 catchError 之前使用 retry 操作符。它会订阅到原始的来源可观察对象,它可以重新运行导致结果出错的动作序列。如果其中包含 HTTP 请求,它就会重新发起那个 HTTP 请求。

Use the retry operator before the catchError operator. It resubscribes to the original source observable, which can then re-run the full sequence of actions that resulted in the error. If this includes an HTTP request, it will retry that HTTP request.

下列代码把前面的例子改成了在捕获错误之前重发请求:

The following converts the previous example to retry the request before catching the error:

import { ajax } from 'rxjs/ajax'; import { map, retry, catchError } from 'rxjs/operators'; const apiData = ajax('/api/data').pipe( retry(3), // Retry up to 3 times before failing map(res => { if (!res.response) { throw new Error('Value expected!'); } return res.response; }), catchError(err => of([])) ); apiData.subscribe({ next(x) { console.log('data: ', x); }, error(err) { console.log('errors already caught... will not run'); } });
retry operator
      
      
  1. import { ajax } from 'rxjs/ajax';
  2. import { map, retry, catchError } from 'rxjs/operators';
  3.  
  4. const apiData = ajax('/api/data').pipe(
  5. retry(3), // Retry up to 3 times before failing
  6. map(res => {
  7. if (!res.response) {
  8. throw new Error('Value expected!');
  9. }
  10. return res.response;
  11. }),
  12. catchError(err => of([]))
  13. );
  14.  
  15. apiData.subscribe({
  16. next(x) { console.log('data: ', x); },
  17. error(err) { console.log('errors already caught... will not run'); }
  18. });

不要重试登录认证请求,这些请求只应该由用户操作触发。我们肯定不会希望自动重复发送登录请求导致用户的账号被锁定。

Do not retry authentication requests, since these should only be initiated by user action. We don't want to lock out user accounts with repeated login requests that the user has not initiated.

可观察对象的命名约定

Naming conventions for observables

由于 Angular 的应用几乎都是用 TypeScript 写的,你通常会希望知道某个变量是否可观察对象。虽然 Angular 框架并没有针对可观察对象的强制性命名约定,不过你经常会看到可观察对象的名字以“$”符号结尾。

Because Angular applications are mostly written in TypeScript, you will typically know when a variable is an observable. Although the Angular framework does not enforce a naming convention for observables, you will often see observables named with a trailing “$” sign.

这在快速浏览代码并查找可观察对象值时会非常有用。同样的,如果你希望用某个属性来存储来自可观察对象的最近一个值,它的命名惯例是与可观察对象同名,但不带“$”后缀。

This can be useful when scanning through code and looking for observable values. Also, if you want a property to store the most recent value from an observable, it can be convenient to simply use the same name with or without the “$”.

比如:

For example:

import { Component } from '@angular/core'; import { Observable } from 'rxjs'; @Component({ selector: 'app-stopwatch', templateUrl: './stopwatch.component.html' }) export class StopwatchComponent { stopwatchValue: number; stopwatchValue$: Observable<number>; start() { this.stopwatchValue$.subscribe(num => this.stopwatchValue = num ); } }
Naming observables
      
      
  1. import { Component } from '@angular/core';
  2. import { Observable } from 'rxjs';
  3.  
  4. @Component({
  5. selector: 'app-stopwatch',
  6. templateUrl: './stopwatch.component.html'
  7. })
  8. export class StopwatchComponent {
  9.  
  10. stopwatchValue: number;
  11. stopwatchValue$: Observable<number>;
  12.  
  13. start() {
  14. this.stopwatchValue$.subscribe(num =>
  15. this.stopwatchValue = num
  16. );
  17. }
  18. }