RxJS - 使用 Subjects



Subject 是一種可以多播(即與多個觀察者通訊)的 Observable。考慮一個帶有事件監聽器的按鈕,使用 addListener 附加到事件的函式在使用者每次點選按鈕時都會被呼叫,Subject 的功能與此類似。

在本章中,我們將討論以下主題:

  • 建立 Subject
  • Observable 和 Subject 之間的區別是什麼?
  • Behaviour Subject
  • Replay Subject
  • AsyncSubject

建立 Subject

要使用 Subject,我們需要匯入 Subject,如下所示:

import { Subject } from 'rxjs';

您可以如下建立 Subject 物件:

const subject_test = new Subject();

該物件是一個 Observer,具有三個方法:

  • next(v)
  • error(e)
  • complete()

訂閱 Subject

您可以在 Subject 上建立多個訂閱,如下所示:

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});

訂閱註冊到 Subject 物件,就像我們之前討論的 addListener 一樣。

向 Subject 傳遞資料

您可以使用 next() 方法向建立的 Subject 傳遞資料。

subject_test.next("A");

資料將傳遞到新增到 Subject 上的所有訂閱。

示例

這是一個 Subject 的工作示例:

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.next("B");

subject_test 物件是透過呼叫 new Subject() 建立的。subject_test 物件引用了 next()、error() 和 complete() 方法。上面示例的輸出如下所示:

輸出

Passing Data

我們可以使用 complete() 方法停止 Subject 的執行,如下所示。

示例

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.complete();
subject_test.next("B");

一旦我們呼叫 complete,稍後呼叫的 next 方法將不會被呼叫。

輸出

Passing Data Method

現在讓我們看看如何呼叫 error() 方法。

示例

下面是一個工作示例:

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.error(new Error("There is an error"));

輸出

Passing Data Error

Observable 和 Subject 之間的區別是什麼?

Observable 將一對一地與訂閱者通訊。每當您訂閱 Observable 時,執行將從頭開始。以使用 ajax 發出的 Http 呼叫為例,以及兩個呼叫 Observable 的訂閱者。您將在瀏覽器網路選項卡中看到兩個 Http 請求。

示例

這是一個相同功能的工作示例:

import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber1 = final_val.subscribe(a => console.log(a));
let subscriber2 = final_val.subscribe(a => console.log(a));

輸出

Observable

Observable Ex

現在,問題在於,我們希望共享相同的資料,但不想為此付出發出兩個 Http 呼叫的代價。我們希望發出一個 Http 呼叫並在訂閱者之間共享資料。

這可以透過 Subjects 實現。它是一個可以多播(即與多個觀察者通訊)的 Observable。它可以在訂閱者之間共享值。

示例

這是一個使用 Subjects 的工作示例:

import { Subject } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(v)
});
subject_test.subscribe({
   next: (v) => console.log(v)
});

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber = final_val.subscribe(subject_test);

輸出

Observable possible

現在您可以看到只有一個 Http 呼叫,並且相同的資料在呼叫的訂閱者之間共享。

Observable subscribers

Behaviour Subject

Behaviour Subject 會在呼叫時為您提供最新值。

您可以如下建立 Behaviour Subject:

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject("Testing Behaviour Subject"); 
// initialized the behaviour subject with value:Testing Behaviour Subject

示例

這是一個使用 Behaviour Subject 的工作示例:

import { BehaviorSubject } from 'rxjs';
const behavior_subject = new BehaviorSubject("Testing Behaviour Subject"); 
// 0 is the initial value

behavior_subject.subscribe({
   next: (v) => console.log(`observerA: ${v}`)
});

behavior_subject.next("Hello");
behavior_subject.subscribe({
   next: (v) => console.log(`observerB: ${v}`)
});
behavior_subject.next("Last call to Behaviour Subject");

輸出

Behaviour Subject

Replay Subject

Replay Subject 類似於 Behaviour Subject,它可以緩衝值並將這些值重播給新的訂閱者。

示例

這是一個 Replay Subject 的工作示例:

import { ReplaySubject } from 'rxjs';
const replay_subject = new ReplaySubject(2); 
// buffer 2 values but new subscribers

replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject A: ${v}`)
});

replay_subject.next(1);
replay_subject.next(2);
replay_subject.next(3);
replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject B: ${v}`)
});

replay_subject.next(5);

Replay Subject 上使用的緩衝區值為 2。因此,最後兩個值將被緩衝並用於呼叫的新訂閱者。

輸出

Replay Subject

AsyncSubject

對於 AsyncSubject,最後一個呼叫的值將傳遞給訂閱者,並且只有在呼叫 complete() 方法後才會執行此操作。

示例

這是一個相同功能的工作示例:

import { AsyncSubject } from 'rxjs';

const async_subject = new AsyncSubject();

async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject A: ${v}`)
});

async_subject.next(1);
async_subject.next(2);
async_subject.complete();
async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject B: ${v}`)
});

在這裡,在呼叫 complete 之前,傳遞給 Subject 的最後一個值為 2,並且該值也傳遞給訂閱者。

輸出

Async Subject
廣告