填寫這份《一分鐘調查》,幫我們(開發組)做得更好!去填寫Home

RxJS 函式庫

The RxJS library

響應式程式設計是一種面向資料流和變更傳播的非同步程式設計正規化(Wikipedia)。RxJS(響應式擴充套件的 JavaScript 版)是一個使用可觀察物件進行響應式程式設計的函式庫,它讓組合非同步程式碼和基於回呼(Callback)的程式碼變得更簡單。參閱 RxJS 官方文件

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change (Wikipedia). RxJS (Reactive Extensions for JavaScript) is a library for reactive programming using observables that makes it easier to compose asynchronous or callback-based code. See (RxJS Docs).

RxJS 提供了一種對 Observable 型別的實現,直到 Observable 成為了 JavaScript 語言的一部分並且瀏覽器支援它之前,它都是必要的。這個函式庫還提供了一些工具函式,用於建立和使用可觀察物件。這些工具函式可用於:

RxJS provides an implementation of the Observable type, which is needed until the type becomes part of the language and until browsers support it. The library also provides utility functions for creating and working with observables. These utility functions can be used for:

  • 把現有的非同步程式碼轉換成可觀察物件

    Converting existing code for async operations into observables

  • 迭代流中的各個值

    Iterating through the values in a stream

  • 把這些值對映成其它型別

    Mapping values to different types

  • 對流進行過濾

    Filtering streams

  • 組合多個流

    Composing multiple streams

建立可觀察物件的函式

Observable creation functions

RxJS 提供了一些用來建立可觀察物件的函式。這些函式可以簡化根據某些東西建立可觀察物件的過程,比如事件、定時器、承諾等等。比如:

RxJS offers a number of functions that can be used to create new observables. These functions can simplify the process of creating observables from things such as events, timers, promises, and so on. For example:

import { from } from 'rxjs'; // Create an Observable out of a promise const data = from(fetch('/api/endpoint')); // Subscribe to begin listening for async result data.subscribe({ next(response) { console.log(response); }, error(err) { console.error('Error: ' + err); }, complete() { console.log('Completed'); } });
Create an observable from a promise
      
      import { from } from 'rxjs';

// Create an Observable out of a promise
const data = from(fetch('/api/endpoint'));
// Subscribe to begin listening for async result
data.subscribe({
  next(response) { console.log(response); },
  error(err) { console.error('Error: ' + err); },
  complete() { console.log('Completed'); }
});
    
import { interval } from 'rxjs'; // Create an Observable that will publish a value on an interval const secondsCounter = interval(1000); // Subscribe to begin publishing values const subscription = secondsCounter.subscribe(n => console.log(`It's been ${n + 1} seconds since subscribing!`));
Create an observable from a counter
      
      import { interval } from 'rxjs';

// Create an Observable that will publish a value on an interval
const secondsCounter = interval(1000);
// Subscribe to begin publishing values
const subscription = secondsCounter.subscribe(n =>
  console.log(`It's been ${n + 1} seconds since subscribing!`));
    
import { fromEvent } from 'rxjs'; const el = document.getElementById('my-element'); // Create an Observable that will publish mouse movements const mouseMoves = fromEvent(el, 'mousemove'); // Subscribe to start listening for mouse-move events const subscription = mouseMoves.subscribe((evt: MouseEvent) => { // Log coords of mouse movements console.log(`Coords: ${evt.clientX} X ${evt.clientY}`); // When the mouse is over the upper-left of the screen, // unsubscribe to stop listening for mouse movements if (evt.clientX < 40 && evt.clientY < 40) { subscription.unsubscribe(); } });
Create an observable from an event
      
      import { fromEvent } from 'rxjs';

const el = document.getElementById('my-element');

// Create an Observable that will publish mouse movements
const mouseMoves = fromEvent(el, 'mousemove');

// Subscribe to start listening for mouse-move events
const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
  // Log coords of mouse movements
  console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);

  // When the mouse is over the upper-left of the screen,
  // unsubscribe to stop listening for mouse movements
  if (evt.clientX < 40 && evt.clientY < 40) {
    subscription.unsubscribe();
  }
});
    
import { ajax } from 'rxjs/ajax'; // Create an Observable that will create an AJAX request const apiData = ajax('/api/data'); // Subscribe to create the request apiData.subscribe(res => console.log(res.status, res.response));
Create an observable that creates an AJAX request
      
      import { ajax } from 'rxjs/ajax';

// Create an Observable that will create an AJAX request
  const apiData = ajax('/api/data');
  // Subscribe to create the request
  apiData.subscribe(res => console.log(res.status, res.response));
    

運算子

Operators

運算子是基於可觀察物件建構的一些對集合進行復雜操作的函式。RxJS 定義了一些運算子,比如 map()filter()concat()flatMap()

Operators are functions that build on the observables foundation to enable sophisticated manipulation of collections. For example, RxJS defines operators such as map(), filter(), concat(), and flatMap().

運算子接受一些配置項,然後返回一個以來源可觀察物件為引數的函式。當執行這個返回的函式時,這個運算子會觀察來源可觀察物件中發出的值,轉換它們,並返回由轉換後的值組成的新的可觀察物件。下面是一個簡單的例子:

Operators take configuration options, and they return a function that takes a source observable. When executing this returned function, the operator observes the source observable’s emitted values, transforms them, and returns a new observable of those transformed values. Here is a simple example:

import { of } from 'rxjs'; import { map } from 'rxjs/operators'; const nums = of(1, 2, 3); const squareValues = map((val: number) => val * val); const squaredNums = squareValues(nums); squaredNums.subscribe(x => console.log(x)); // Logs // 1 // 4 // 9
Map operator
      
      import { of } from 'rxjs';
import { map } from 'rxjs/operators';

const nums = of(1, 2, 3);

const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);

squaredNums.subscribe(x => console.log(x));

// Logs
// 1
// 4
// 9
    

你可以使用管道來把這些運算子連結起來。管道讓你可以把多個由運算子返回的函式組合成一個。pipe() 函式以你要組合的這些函式作為引數,並且返回一個新的函式,當執行這個新函式時,就會順序執行那些被組合進去的函式。

You can use pipes to link operators together. Pipes let you combine multiple functions into a single function. The pipe() function takes as its arguments the functions you want to combine, and returns a new function that, when executed, runs the composed functions in sequence.

應用於某個可觀察物件上的一組運算子就像一個處理流程 —— 也就是說,對你感興趣的這些值進行處理的一組操作步驟。這個處理流程本身不會做任何事。你需要呼叫 subscribe() 來透過處理流程得出並產生一個結果。

A set of operators applied to an observable is a recipe—that is, a set of instructions for producing the values you’re interested in. By itself, the recipe doesn’t do anything. You need to call subscribe() to produce a result through the recipe.

例子如下:

Here’s an example:

import { of, pipe } from 'rxjs'; import { filter, map } from 'rxjs/operators'; const nums = of(1, 2, 3, 4, 5); // Create a function that accepts an Observable. const squareOddVals = pipe( filter((n: number) => n % 2 !== 0), map(n => n * n) ); // Create an Observable that will run the filter and map functions const squareOdd = squareOddVals(nums); // Subscribe to run the combined functions squareOdd.subscribe(x => console.log(x));
Standalone pipe function
      
      import { of, pipe } from 'rxjs';
import { filter, map } from 'rxjs/operators';

const nums = of(1, 2, 3, 4, 5);

// Create a function that accepts an Observable.
const squareOddVals = pipe(
  filter((n: number) => n % 2 !== 0),
  map(n => n * n)
);

// Create an Observable that will run the filter and map functions
const squareOdd = squareOddVals(nums);

// Subscribe to run the combined functions
squareOdd.subscribe(x => console.log(x));
    

pipe() 函式也同時是 RxJS 的 Observable 上的一個方法,所以你可以用下列簡寫形式來達到同樣的效果:

The pipe() function is also a method on the RxJS Observable, so you use this shorter form to define the same operation:

import { of } from 'rxjs'; import { filter, map } from 'rxjs/operators'; const squareOdd = of(1, 2, 3, 4, 5) .pipe( filter(n => n % 2 !== 0), map(n => n * n) ); // Subscribe to get values squareOdd.subscribe(x => console.log(x));
Observable.pipe function
      
      import { of } from 'rxjs';
import { filter, map } from 'rxjs/operators';

const squareOdd = of(1, 2, 3, 4, 5)
  .pipe(
    filter(n => n % 2 !== 0),
    map(n => n * n)
  );

// Subscribe to get values
squareOdd.subscribe(x => console.log(x));
    

常用運算子

Common operators

RxJS 提供了很多運算子,不過只有少數是常用的。 下面是一個常用運算子的列表和用法範例,參閱 RxJS API 文件

RxJS provides many operators, but only a handful are used frequently. For a list of operators and usage samples, visit the RxJS API Documentation.

注意,對於 Angular 應用來說,我們提倡使用管道來組合運算子,而不是使用鏈式寫法。鏈式寫法仍然在很多 RxJS 中使用著。

Note that, for Angular apps, we prefer combining operators with pipes, rather than chaining. Chaining is used in many RxJS examples.

類別

Area

操作

Operators

建立

Creation

from, fromEvent, of

from, fromPromise,fromEvent, of

組合

Combination

combineLatest, concat, merge, startWith , withLatestFrom, zip

過濾

Filtering

debounceTime, distinctUntilChanged, filter, take, takeUntil

轉換

Transformation

bufferTime, concatMap, map, mergeMap, scan, switchMap

工具

Utility

tap

多播

Multicasting

share

錯誤處理

Error handling

除了可以在訂閱時提供 error() 處理器外,RxJS 還提供了 catchError 運算子,它允許你在管道中處理已知錯誤。

In addition to the error() handler that you provide on subscription, RxJS provides the catchError operator that lets you handle known errors in the observable recipe.

假設你有一個可觀察物件,它發起 API 請求,然後對伺服器返回的響應進行對映。如果伺服器返回了錯誤或值不存在,就會產生一個錯誤。如果你捕獲這個錯誤並提供了一個預設值,流就會繼續處理這些值,而不會報錯。

For instance, suppose you have an observable that makes an API request and maps to the response from the server. If the server returns an error or the value doesn’t exist, an error is produced. If you catch this error and supply a default value, your stream continues to process values rather than erroring out.

下面是使用 catchError 運算子實現這種效果的例子:

Here's an example of using the catchError operator to do this:

import { of } from 'rxjs'; import { ajax } from 'rxjs/ajax'; import { map, catchError } from 'rxjs/operators'; // Return "response" from the API. If an error happens, // return an empty array. const apiData = ajax('/api/data').pipe( map((res: any) => { if (!res.response) { throw new Error('Value expected!'); } return res.response; }), catchError(err => of([])) ); apiData.subscribe({ next(x) { console.log('data: ', x); }, error(err) { console.log('errors already caught... will not run'); } });
catchError operator
      
      import { of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';

// Return "response" from the API. If an error happens,
// return an empty array.
const apiData = ajax('/api/data').pipe(
  map((res: any) => {
    if (!res.response) {
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});
    

重試失敗的可觀察物件

Retry failed observable

catchError 提供了一種簡單的方式進行恢復,而 retry 運算子讓你可以嘗試失敗的請求。

Where the catchError operator provides a simple path of recovery, the retry operator lets you retry a failed request.

可以在 catchError 之前使用 retry 運算子。它會訂閱到原始的來源可觀察物件,它可以重新執行導致結果出錯的動作序列。如果其中包含 HTTP 請求,它就會重新發起那個 HTTP 請求。

Use the retry operator before the catchError operator. It resubscribes to the original source observable, which can then re-run the full sequence of actions that resulted in the error. If this includes an HTTP request, it will retry that HTTP request.

下列程式碼把前面的例子改成了在捕獲錯誤之前重發請求:

The following converts the previous example to retry the request before catching the error:

import { of } from 'rxjs'; import { ajax } from 'rxjs/ajax'; import { map, retry, catchError } from 'rxjs/operators'; const apiData = ajax('/api/data').pipe( map((res: any) => { if (!res.response) { console.log('Error occurred.'); throw new Error('Value expected!'); } return res.response; }), retry(3), // Retry up to 3 times before failing catchError(err => of([])) ); apiData.subscribe({ next(x) { console.log('data: ', x); }, error(err) { console.log('errors already caught... will not run'); } });
retry operator
      
      import { of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';

const apiData = ajax('/api/data').pipe(
  map((res: any) => {
    if (!res.response) {
      console.log('Error occurred.');
      throw new Error('Value expected!');
    }
    return res.response;
  }),
  retry(3), // Retry up to 3 times before failing
  catchError(err => of([]))
);

apiData.subscribe({
  next(x) { console.log('data: ', x); },
  error(err) { console.log('errors already caught... will not run'); }
});
    

不要重試登入認證請求,這些請求只應該由使用者操作觸發。我們肯定不會希望自動重複傳送登入請求導致使用者的賬號被鎖定。

Do not retry authentication requests, since these should only be initiated by user action. We don't want to lock out user accounts with repeated login requests that the user has not initiated.

可觀察物件的命名約定

Naming conventions for observables

由於 Angular 的應用幾乎都是用 TypeScript 寫的,你通常會希望知道某個變數是否可觀察物件。雖然 Angular 框架並沒有針對可觀察物件的強制性命名約定,不過你經常會看到可觀察物件的名字以“$”符號結尾。

Because Angular applications are mostly written in TypeScript, you will typically know when a variable is an observable. Although the Angular framework does not enforce a naming convention for observables, you will often see observables named with a trailing “$” sign.

這在快速瀏覽程式碼並查詢可觀察物件值時會非常有用。同樣的,如果你希望用某個屬性來儲存來自可觀察物件的最近一個值,它的命名慣例是與可觀察物件同名,但不帶“$”字尾。

This can be useful when scanning through code and looking for observable values. Also, if you want a property to store the most recent value from an observable, it can be convenient to simply use the same name with or without the “$”.

比如:

For example:

import { Component } from '@angular/core'; import { Observable } from 'rxjs'; @Component({ selector: 'app-stopwatch', templateUrl: './stopwatch.component.html' }) export class StopwatchComponent { stopwatchValue: number; stopwatchValue$: Observable<number>; start() { this.stopwatchValue$.subscribe(num => this.stopwatchValue = num ); } }
Naming observables
      
      import { Component } from '@angular/core';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-stopwatch',
  templateUrl: './stopwatch.component.html'
})
export class StopwatchComponent {

  stopwatchValue: number;
  stopwatchValue$: Observable<number>;

  start() {
    this.stopwatchValue$.subscribe(num =>
      this.stopwatchValue = num
    );
  }
}