填寫這份《一分鐘調查》,幫我們(開發組)做得更好!去填寫Home

使用可觀察物件(Observable)來傳遞值

Using observables to pass values

可觀察物件對在應用的各個部分之間傳遞訊息提供了支援。 它們在 Angular 中頻繁使用,並且推薦把它們用於事件處理、非同步程式設計以及處理多個值等場景。

Observables provide support for passing messages between parts of your application. They are used frequently in Angular and are a technique for event handling, asynchronous programming, and handling multiple values.

觀察者(Observer)模式是一個軟體設計模式,它有一個物件,稱之為主體 Subject,負責維護一個依賴項(稱之為觀察者 Observer)的列表,並且在狀態變化時自動通知它們。 該模式和發佈/訂閱模式非常相似(但不完全一樣)。

The observer pattern is a software design pattern in which an object, called the subject, maintains a list of its dependents, called observers, and notifies them automatically of state changes. This pattern is similar (but not identical) to the publish/subscribe design pattern.

可觀察物件是宣告式的 —— 也就是說,雖然你定義了一個用於發佈值的函式,但是在有消費者訂閱它之前,這個函式並不會實際執行。 訂閱之後,當這個函式執行完或取消訂閱時,訂閱者就會收到通知。

Observables are declarative—that is, you define a function for publishing values, but it is not executed until a consumer subscribes to it. The subscribed consumer then receives notifications until the function completes, or until they unsubscribe.

可觀察物件可以傳送多個任意型別的值 —— 字面量、訊息、事件。無論這些值是同步傳送的還是非同步傳送的,接收這些值的 API 都是一樣的。 由於準備(setup)和清場(teardown)的邏輯都是由可觀察物件自己處理的,因此你的應用程式碼只管訂閱並消費這些值就可以了,做完之後,取消訂閱。無論這個流是按鍵流、HTTP 響應流還是定時器,對這些值進行監聽和停止監聽的介面都是一樣的。

An observable can deliver multiple values of any type—literals, messages, or events, depending on the context. The API for receiving values is the same whether the values are delivered synchronously or asynchronously. Because setup and teardown logic are both handled by the observable, your application code only needs to worry about subscribing to consume values, and when done, unsubscribing. Whether the stream was keystrokes, an HTTP response, or an interval timer, the interface for listening to values and stopping listening is the same.

由於這些優點,可觀察物件在 Angular 中得到廣泛使用,應用開發者也同樣如此。

Because of these advantages, observables are used extensively within Angular, and for app development as well.

基本用法和詞彙

Basic usage and terms

作為發佈者,你建立一個 Observable 的實例,其中定義了一個訂閱者(subscriber)函式。 當有消費者呼叫 subscribe() 方法時,這個函式就會執行。 訂閱者函式用於定義“如何獲取或產生那些要發佈的值或訊息”。

As a publisher, you create an Observable instance that defines a subscriber function. This is the function that is executed when a consumer calls the subscribe() method. The subscriber function defines how to obtain or generate values or messages to be published.

要執行所建立的可觀察物件,並開始從中接收通知,你就要呼叫它的 subscribe() 方法,並傳入一個觀察者(observer)。 這是一個 JavaScript 物件,它定義了你收到的這些訊息的處理器(handler)。 subscribe() 呼叫會返回一個 Subscription 物件,該物件具有一個 unsubscribe() 方法。 當呼叫該方法時,你就會停止接收通知。

To execute the observable you have created and begin receiving notifications, you call its subscribe() method, passing an observer. This is a JavaScript object that defines the handlers for the notifications you receive. The subscribe() call returns a Subscription object that has an unsubscribe() method, which you call to stop receiving notifications.

下面這個例子中示範了這種基本用法,它展示了如何使用可觀察物件來對當前地理位置進行更新。

Here's an example that demonstrates the basic usage model by showing how an observable could be used to provide geolocation updates.

Observe geolocation updates
      
      // Create an Observable that will start listening to geolocation updates
// when a consumer subscribes.
const locations = new Observable((observer) => {
  let watchId: number;

  // Simple geolocation API check provides values to publish
  if ('geolocation' in navigator) {
    watchId = navigator.geolocation.watchPosition((position: GeolocationPosition) => {
      observer.next(position);
    }, (error: GeolocationPositionError) => {
      observer.error(error);
    });
  } else {
    observer.error('Geolocation not available');
  }

  // When the consumer unsubscribes, clean up data ready for next subscription.
  return {
    unsubscribe() {
      navigator.geolocation.clearWatch(watchId);
    }
  };
});

// Call subscribe() to start listening for updates.
const locationsSubscription = locations.subscribe({
  next(position) {
    console.log('Current Position: ', position);
  },
  error(msg) {
    console.log('Error Getting Location: ', msg);
  }
});

// Stop listening for location after 10 seconds
setTimeout(() => {
  locationsSubscription.unsubscribe();
}, 10000);
    

定義觀察者

Defining observers

用於接收可觀察物件通知的處理器要實現 Observer 介面。這個物件定義了一些回呼(Callback)函式來處理可觀察物件可能會發來的三種通知:

A handler for receiving observable notifications implements the Observer interface. It is an object that defines callback methods to handle the three types of notifications that an observable can send:

通知型別

Notification type

說明

Description

next

必要。用來處理每個送達值。在開始執行後可能執行零次或多次。

Required. A handler for each delivered value. Called zero or more times after execution starts.

error

可選。用來處理錯誤通知。錯誤會中斷這個可觀察物件實例的執行過程。

Optional. A handler for an error notification. An error halts execution of the observable instance.

complete

可選。用來處理執行完畢(complete)通知。當執行完畢後,這些值就會繼續傳給下一個處理器。

Optional. A handler for the execution-complete notification. Delayed values can continue to be delivered to the next handler after execution is complete.

觀察者物件可以定義這三種處理器的任意組合。如果你不為某種通知型別提供處理器,這個觀察者就會忽略相應型別的通知。

An observer object can define any combination of these handlers. If you don't supply a handler for a notification type, the observer ignores notifications of that type.

訂閱

Subscribing

只有當有人訂閱 Observable 的實例時,它才會開始發佈值。 訂閱時要先呼叫該實例的 subscribe() 方法,並把一個觀察者物件傳給它,用來接收通知。

An Observable instance begins publishing values only when someone subscribes to it. You subscribe by calling the subscribe() method of the instance, passing an observer object to receive the notifications.

為了展示訂閱的原理,我們需要建立新的可觀察物件。它有一個建構函式可以用來建立新實例,但是為了更簡明,也可以使用 Observable 上定義的一些靜態方法來建立一些常用的簡單可觀察物件:

In order to show how subscribing works, we need to create a new observable. There is a constructor that you use to create new instances, but for illustration, we can use some methods from the RxJS library that create simple observables of frequently used types:

  • of(...items) —— 返回一個 Observable 實例,它用同步的方式把引數中提供的這些值傳送出來。

    of(...items)—Returns an Observable instance that synchronously delivers the values provided as arguments.

  • from(iterable) —— 把它的引數轉換成一個 Observable 實例。 該方法通常用於把一個數組轉換成一個(傳送多個值的)可觀察物件。

    from(iterable)—Converts its argument to an Observable instance. This method is commonly used to convert an array to an observable.

下面的例子會建立並訂閱一個簡單的可觀察物件,它的觀察者會把接收到的訊息記錄到控制檯中:

Here's an example of creating and subscribing to a simple observable, with an observer that logs the received message to the console:

Subscribe using observer
      
      // Create simple observable that emits three values
const myObservable = of(1, 2, 3);

// Create observer object
const myObserver = {
  next: (x: number) => console.log('Observer got a next value: ' + x),
  error: (err: Error) => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

// Execute with the observer object
myObservable.subscribe(myObserver);

// Logs:
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification
    

另外,subscribe() 方法還可以接收定義在同一行中的回呼(Callback)函式,無論 nexterror 還是 complete 處理器。比如,下面的 subscribe() 呼叫和前面指定預定義觀察者的例子是等價的。

Alternatively, the subscribe() method can accept callback function definitions in line, for next, error, and complete handlers. For example, the following subscribe() call is the same as the one that specifies the predefined observer:

Subscribe with positional arguments
      
      myObservable.subscribe(
  x => console.log('Observer got a next value: ' + x),
  err => console.error('Observer got an error: ' + err),
  () => console.log('Observer got a complete notification')
);
    

無論哪種情況,next 處理器都是必要的,而 errorcomplete 處理器是可選的。

In either case, a next handler is required. The error and complete handlers are optional.

注意,next() 函式可以接受訊息字串、事件物件、數字值或各種結構,具體型別取決於上下文。 為了更通用一點,我們把由可觀察物件發佈出來的資料統稱為。任何型別的值都可以表示為可觀察物件,而這些值會被發佈為一個流。

Note that a next() function could receive, for instance, message strings, or event objects, numeric values, or structures, depending on context. As a general term, we refer to data published by an observable as a stream. Any type of value can be represented with an observable, and the values are published as a stream.

建立可觀察物件

Creating observables

使用 Observable 建構函式可以建立任何型別的可觀察流。 當執行可觀察物件的 subscribe() 方法時,這個建構函式就會把它接收到的引數作為訂閱函式來執行。 訂閱函式會接收一個 Observer 物件,並把值發佈給觀察者的 next() 方法。

Use the Observable constructor to create an observable stream of any type. The constructor takes as its argument the subscriber function to run when the observable’s subscribe() method executes. A subscriber function receives an Observer object, and can publish values to the observer's next() method.

比如,要建立一個與前面的 of(1, 2, 3) 等價的可觀察物件,你可以這樣做:

For example, to create an observable equivalent to the of(1, 2, 3) above, you could do something like this:

Create observable with constructor
      
      // This function runs when subscribe() is called
function sequenceSubscriber(observer: Observer<number>) {
  // synchronously deliver 1, 2, and 3, then complete
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();

  // unsubscribe function doesn't need to do anything in this
  // because values are delivered synchronously
  return {unsubscribe() {}};
}

// Create a new Observable that will deliver the above sequence
const sequence = new Observable(sequenceSubscriber);

// execute the Observable and print the result of each notification
sequence.subscribe({
  next(num) { console.log(num); },
  complete() { console.log('Finished sequence'); }
});

// Logs:
// 1
// 2
// 3
// Finished sequence
    

如果要略微加強這個例子,我們可以建立一個用來發布事件的可觀察物件。在這個例子中,訂閱函式是用內聯方式定義的。

To take this example a little further, we can create an observable that publishes events. In this example, the subscriber function is defined inline.

Create with custom fromEvent function
      
      function fromEvent<T extends keyof HTMLElementEventMap>(target: HTMLElement, eventName: T) {
  return new Observable<HTMLElementEventMap[T]>((observer) => {
    const handler = (e: HTMLElementEventMap[T]) => observer.next(e);

    // Add the event handler to the target
    target.addEventListener(eventName, handler);

    return () => {
      // Detach the event handler from the target
      target.removeEventListener(eventName, handler);
    };
  });
}
    

現在,你就可以使用這個函式來建立可發佈 keydown 事件的可觀察物件了:

Now you can use this function to create an observable that publishes keydown events:

Use custom fromEvent function
      
      const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;

const subscription = fromEvent(nameInput, 'keydown').subscribe((e: KeyboardEvent) => {
  if (e.keyCode === ESC_KEY) {
    nameInput.value = '';
  }
});
    

多播

Multicasting

典型的可觀察物件會為每一個觀察者建立一次新的、獨立的執行。 當觀察者進行訂閱時,該可觀察物件會連上一個事件處理器,並且向那個觀察者傳送一些值。當第二個觀察者訂閱時,這個可觀察物件就會連上一個新的事件處理器,並獨立執行一次,把這些值傳送給第二個可觀察物件。

A typical observable creates a new, independent execution for each subscribed observer. When an observer subscribes, the observable wires up an event handler and delivers values to that observer. When a second observer subscribes, the observable then wires up a new event handler and delivers values to that second observer in a separate execution.

有時候,不應該對每一個訂閱者都獨立執行一次,你可能會希望每次訂閱都得到同一批值 —— 即使是那些你已經發送過的。這在某些情況下有用,比如用來發送 document 上的點選事件的可觀察物件。

Sometimes, instead of starting an independent execution for each subscriber, you want each subscription to get the same values—even if values have already started emitting. This might be the case with something like an observable of clicks on the document object.

多播用來讓可觀察物件在一次執行中同時廣播給多個訂閱者。藉助支援多播的可觀察物件,你不必註冊多個監聽器,而是複用第一個(next)監聽器,並且把值傳送給各個訂閱者。

Multicasting is the practice of broadcasting to a list of multiple subscribers in a single execution. With a multicasting observable, you don't register multiple listeners on the document, but instead re-use the first listener and send values out to each subscriber.

當建立可觀察物件時,你要決定你希望別人怎麼用這個物件以及是否對它的值進行多播。

When creating an observable you should determine how you want that observable to be used and whether or not you want to multicast its values.

來看一個從 1 到 3 進行計數的例子,它每發出一個數字就會等待 1 秒。

Let’s look at an example that counts from 1 to 3, with a one-second delay after each number emitted.

Create a delayed sequence
      
      function sequenceSubscriber(observer: Observer<number>) {
  const seq = [1, 2, 3];
  let timeoutId: any;

  // Will run through an array of numbers, emitting one value
  // per second until it gets to the end of the array.
  function doInSequence(arr: number[], idx: number) {
    timeoutId = setTimeout(() => {
      observer.next(arr[idx]);
      if (idx === arr.length - 1) {
        observer.complete();
      } else {
        doInSequence(arr, ++idx);
      }
    }, 1000);
  }

  doInSequence(seq, 0);

  // Unsubscribe should clear the timeout to stop execution
  return {
    unsubscribe() {
      clearTimeout(timeoutId);
    }
  };
}

// Create a new Observable that will deliver the above sequence
const sequence = new Observable(sequenceSubscriber);

sequence.subscribe({
  next(num) { console.log(num); },
  complete() { console.log('Finished sequence'); }
});

// Logs:
// (at 1 second): 1
// (at 2 seconds): 2
// (at 3 seconds): 3
// (at 3 seconds): Finished sequence
    

注意,如果你訂閱了兩次,就會有兩個獨立的流,每個流都會每秒發出一個數字。程式碼如下:

Notice that if you subscribe twice, there will be two separate streams, each emitting values every second. It looks something like this:

Two subscriptions
      
      // Subscribe starts the clock, and will emit after 1 second
sequence.subscribe({
  next(num) { console.log('1st subscribe: ' + num); },
  complete() { console.log('1st sequence finished.'); }
});

// After 1/2 second, subscribe again.
setTimeout(() => {
  sequence.subscribe({
    next(num) { console.log('2nd subscribe: ' + num); },
    complete() { console.log('2nd sequence finished.'); }
  });
}, 500);

// Logs:
// (at 1 second): 1st subscribe: 1
// (at 1.5 seconds): 2nd subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2.5 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3.5 seconds): 2nd subscribe: 3
// (at 3.5 seconds): 2nd sequence finished
    

修改這個可觀察物件以支援多播,程式碼如下:

Changing the observable to be multicasting could look something like this:

Create a multicast subscriber
      
      function multicastSequenceSubscriber() {
  const seq = [1, 2, 3];
  // Keep track of each observer (one for every active subscription)
  const observers: Observer<unknown>[] = [];
  // Still a single timeoutId because there will only ever be one
  // set of values being generated, multicasted to each subscriber
  let timeoutId: any;

  // Return the subscriber function (runs when subscribe()
  // function is invoked)
  return (observer: Observer<unknown>) => {
    observers.push(observer);
    // When this is the first subscription, start the sequence
    if (observers.length === 1) {
      timeoutId = doSequence({
        next(val) {
          // Iterate through observers and notify all subscriptions
          observers.forEach(obs => obs.next(val));
        },
        error() { /* Handle the error... */ },
        complete() {
          // Notify all complete callbacks
          observers.slice(0).forEach(obs => obs.complete());
        }
      }, seq, 0);
    }

    return {
      unsubscribe() {
        // Remove from the observers array so it's no longer notified
        observers.splice(observers.indexOf(observer), 1);
        // If there's no more listeners, do cleanup
        if (observers.length === 0) {
          clearTimeout(timeoutId);
        }
      }
    };
  };
}

// Run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(observer: Observer<number>, arr: number[], idx: number) {
  return setTimeout(() => {
    observer.next(arr[idx]);
    if (idx === arr.length - 1) {
      observer.complete();
    } else {
      doSequence(observer, arr, ++idx);
    }
  }, 1000);
}

// Create a new Observable that will deliver the above sequence
const multicastSequence = new Observable(multicastSequenceSubscriber());

// Subscribe starts the clock, and begins to emit after 1 second
multicastSequence.subscribe({
  next(num) { console.log('1st subscribe: ' + num); },
  complete() { console.log('1st sequence finished.'); }
});

// After 1 1/2 seconds, subscribe again (should "miss" the first value).
setTimeout(() => {
  multicastSequence.subscribe({
    next(num) { console.log('2nd subscribe: ' + num); },
    complete() { console.log('2nd sequence finished.'); }
  });
}, 1500);

// Logs:
// (at 1 second): 1st subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3 seconds): 2nd subscribe: 3
// (at 3 seconds): 2nd sequence finished
    

雖然支援多播的可觀察物件需要做更多的準備工作,但對某些應用來說,這非常有用。稍後我們會介紹一些簡化多播的工具,它們讓你能接收任何可觀察物件,並把它變成支援多播的。

Multicasting observables take a bit more setup, but they can be useful for certain applications. Later we will look at tools that simplify the process of multicasting, allowing you to take any observable and make it multicasting.

錯誤處理

Error handling

由於可觀察物件會非同步產生值,所以用 try/catch 是無法捕獲錯誤的。你應該在觀察者中指定一個 error 回呼(Callback)來處理錯誤。發生錯誤時還會導致可觀察物件清理現有的訂閱,並且停止產生值。可觀察物件可以產生值(呼叫 next 回呼(Callback)),也可以呼叫 completeerror 回呼(Callback)來主動結束。

Because observables produce values asynchronously, try/catch will not effectively catch errors. Instead, you handle errors by specifying an error callback on the observer. Producing an error also causes the observable to clean up subscriptions and stop producing values. An observable can either produce values (calling the next callback), or it can complete, calling either the complete or error callback.

      
      myObservable.subscribe({
  next(num) { console.log('Next num: ' + num)},
  error(err) { console.log('Received an error: ' + err)}
});
    

在稍後的小節中會對錯誤處理(特別是從錯誤中的恢復)做更詳細的講解。

Error handling (and specifically recovering from an error) is covered in more detail in a later section.