RxJS - 快速指南
RxJS - 概述
本章介紹了關於 RxJS 的功能、優點和缺點的資訊。在這裡,我們還將學習何時使用 RxJS。
RxJS 的全稱是 Reactive Extension for Javascript。它是一個 JavaScript 庫,使用 Observables 來處理響應式程式設計,處理非同步資料呼叫、回撥和基於事件的程式。RxJS 可以與其他 JavaScript 庫和框架一起使用。它支援 JavaScript,也支援 TypeScript。
什麼是 RxJS?
根據 RxJS 官方網站的定義,它是一個用於使用可觀察序列組合非同步和基於事件的程式的庫。它提供了一個核心型別 Observable、衛星型別(Observer、Schedulers、Subjects)和受 Array#extras(map、filter、reduce、every 等)啟發的運算子,以便將非同步事件作為集合進行處理。
RxJS 的特性
在 RxJS 中,以下概念負責處理非同步任務:
Observable
Observable 是一個函式,它建立一個 Observer 並將其附加到期望獲取值的源,例如,DOM 元素的點選、滑鼠事件或 HTTP 請求等。
Observer
它是一個包含 next()、error() 和 complete() 方法的物件,當與 Observable 互動時,這些方法將被呼叫,例如,按鈕點選、HTTP 請求等。
Subscription
建立 Observable 後,要執行 Observable,我們需要訂閱它。它還可以用於取消執行。
運算子
運算子是一個純函式,它接收 Observable 作為輸入,輸出也是一個 Observable。
Subject
Subject 是一個可以多播(即與多個 Observer 通訊)的 Observable。考慮一個帶有事件監聽器的按鈕,使用 addListener 附加到事件的函式在使用者每次點選按鈕時都會被呼叫,Subject 的功能與此類似。
Schedulers
排程器控制訂閱何時開始和通知。
何時使用 RxJS?
如果您的專案包含大量非同步任務處理,那麼 RxJS 是一個不錯的選擇。它預設情況下與 Angular 專案一起載入。
使用 RxJS 的優點
以下是使用 RxJS 的優點:
RxJS 可以與其他 JavaScript 庫和框架一起使用。它支援 JavaScript,也支援 TypeScript。一些示例包括 Angular、ReactJS、Vuejs、nodejs 等。
在處理非同步任務方面,RxJS 是一個很棒的庫。RxJS 使用 Observables 來處理響應式程式設計,處理非同步資料呼叫、回撥和基於事件的程式。
RxJS 提供了大量運算子,包括數學、轉換、過濾、實用程式、條件、錯誤處理、連線等類別,這使得在使用響應式程式設計時生活變得更加輕鬆。
使用 RxJS 的缺點
以下是使用 RxJS 的缺點:
使用 Observables 除錯程式碼有點困難。
當您開始使用 Observables 時,最終可能會將您的整個程式碼都包裝在 Observables 中。
RxJS - 環境搭建
在本章中,我們將安裝 RxJS。要使用 RxJS,我們需要以下設定:
- NodeJS
- Npm
- RxJS 包安裝
NODEJS 和 NPM 安裝
使用 npm 安裝 RxJS 非常容易。您需要在系統上安裝 nodejs 和 npm。要驗證 NodeJS 和 npm 是否已安裝在您的系統上,請嘗試在命令提示符中執行以下命令。
E:\>node -v && npm -v v10.15.1 6.4.1
如果您獲取了版本號,則表示 nodejs 和 npm 已安裝在您的系統上,並且系統上的當前版本分別為 10 和 6。
如果它沒有列印任何內容,請在您的系統上安裝 nodejs。要安裝 nodejs,請訪問 nodejs 的主頁 https://nodejs.com.tw/en/download/ 並根據您的作業系統安裝軟體包。
nodejs 的下載頁面如下所示:
根據您的作業系統,安裝所需的軟體包。安裝 nodejs 後,npm 也會隨之安裝。要檢查 npm 是否已安裝,請在終端中鍵入 npm –v。它應該顯示 npm 的版本。
RxJS 包安裝
要開始 RxJS 安裝,首先建立一個名為 rxjsproj/ 的資料夾,我們將在其中練習所有 RxJS 示例。
建立 rxjsproj/ 資料夾後,執行命令 npm init,進行專案設定,如下所示
E:\>mkdir rxjsproj E:\>cd rxjsproj E:\rxjsproj>npm init
Npm init 命令在執行期間會詢問一些問題,只需按 Enter 鍵並繼續即可。npm init 執行完成後,它將在 rxjsproj/ 中建立 package.json,如下所示:
rxjsproj/ package.json
現在您可以使用以下命令安裝 rxjs:
npm install ---save-dev rxjs
E:\rxjsproj>npm install --save-dev rxjs npm notice created a lockfile as package-lock.json. You should commit this file. npm WARN rxjsproj@1.0.0 No description npm WARN rxjsproj@1.0.0 No repository field. + rxjs@6.5.3 added 2 packages from 7 contributors and audited 2 packages in 21.89s found 0 vulnerabilities
我們完成了 RxJS 的安裝。現在讓我們嘗試使用 RxJS,為此,在 rxjsproj/ 中建立一個名為 src/ 的資料夾
因此,現在我們的資料夾結構如下所示:
rxjsproj/ node_modules/ src/ package.json
在 src/ 中建立一個名為 testrx.js 的檔案,並編寫以下程式碼:
testrx.js
import { of } from 'rxjs;
import { map } from 'rxjs/operators';
map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`Output is: ${v}`));
當我們使用命令 node testrx.js 在命令提示符中執行上述程式碼時,它將顯示匯入錯誤,因為 nodejs 不知道如何處理匯入。
要使匯入與 nodejs 一起工作,我們需要使用 npm 安裝 ES6 模組包,如下所示:
E:\rxjsproj\src>npm install --save-dev esm npm WARN rxjsproj@1.0.0 No description npm WARN rxjsproj@1.0.0 No repository field. + esm@3.2.25 added 1 package from 1 contributor and audited 3 packages in 9.32s found 0 vulnerabilities
安裝軟體包後,我們現在可以執行 testrx.js 檔案,如下所示:
E:\rxjsproj\src>node -r esm testrx.js Output is: 1 Output is: 4 Output is: 9
現在我們可以看到輸出,它顯示 RxJS 已安裝並可以使用。上述方法將幫助我們在命令列中測試 RxJS。如果您想在瀏覽器中測試 RxJS,我們將需要一些額外的軟體包。
在瀏覽器中測試 RxJS
在 rxjsproj/ 資料夾中安裝以下軟體包:
npm install --save-dev babel-loader @babel/core @babel/preset-env webpack webpack-cli webpack-dev-server
E:\rxjsproj>npm install --save-dev babel-loader
@babel/core @babel/preset-env webpack webpack-cli webpack-dev-server
npm WARN rxjsproj@1.0.0 No description
npm WARN rxjsproj@1.0.0 No repository field.
npm WARN optional SKIPPING OPTIONAL DEPENDENCY: fsevents@1.2.9
(node_modules\fsevents):
npm WARN notsup SKIPPING OPTIONAL DEPENDENCY: Unsupported platform for fsevents@
1.2.9: wanted {"os":"darwin","arch":"any"} (current: {"os":"win32","arch":"x64"})
+ webpack-dev-server@3.8.0
+ babel-loader@8.0.6
+ @babel/preset-env@7.6.0
+ @babel/core@7.6.0
+ webpack-cli@3.3.8
+ webpack@4.39.3
added 675 packages from 373 contributors and audited 10225 packages in 255.567s
found 0 vulnerabilities
要啟動伺服器以執行我們的 Html 檔案,我們將使用 webpack-server。package.json 中的 "publish" 命令將幫助我們啟動並使用 webpack 打包所有 js 檔案。最終用於的打包後的 js 檔案儲存在路徑 /dev 資料夾中。
要使用 webpack,我們需要執行 npm run publish 命令,該命令已新增到 package.json 中,如下所示:
Package.json
{
"name": "rxjsproj",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"publish":"webpack && webpack-dev-server --output-public=/dev/",
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"devDependencies": {
"@babel/core": "^7.6.0",
"@babel/preset-env": "^7.6.0",
"babel-loader": "^8.0.6",
"esm": "^3.2.25",
"rxjs": "^6.5.3",
"webpack": "^4.39.3",
"webpack-cli": "^3.3.8",
"webpack-dev-server": "^3.8.0"
}
}
要使用 webpack,我們必須首先建立一個名為 webpack.config.js 的檔案,其中包含 webpack 工作的配置詳細資訊。
檔案中的詳細資訊如下:
var path = require('path');
module.exports = {
entry: {
app: './src/testrx.js'
},
output: {
path: path.resolve(__dirname, 'dev'),
filename: 'main_bundle.js'
},
mode:'development',
module: {
rules: [
{
test:/\.(js)$/,
include: path.resolve(__dirname, 'src'),
loader: 'babel-loader',
query: {
presets: ['@babel/preset-env']
}
}
]
}
};
檔案的結構如上所示。它以一個提供當前路徑詳細資訊的路徑開頭。
var path = require('path'); //gives the current path
接下來是 module.exports 物件,它具有 entry、output 和 module 屬性。Entry 是起點。在這裡,我們需要提供我們想要編譯的起始 js 檔案。
entry: {
app: './src/testrx.js'
},
path.resolve(_dirname, ‘src/testrx.js’) -- 將在目錄中查詢 src 資料夾,並在該資料夾中查詢 testrx.js。
Output
output: {
path: path.resolve(__dirname, 'dev'),
filename: 'main_bundle.js'
},
輸出是一個包含 path 和 filename 屬性的物件。path 將儲存編譯檔案將儲存在其中的資料夾,filename 將告訴最終檔案在您的 .html 檔案中使用的名稱。
Module
module: {
rules: [
{
test:/\.(js)$/,
include: path.resolve(__dirname, 'src'),
loader: 'babel-loader',
query: {
presets: ['@babel/preset-env']
}
}
]
}
Module 是一個包含 rules 屬性的物件,該屬性具有 test、include、loader、query 屬性。test 將儲存所有以 .js 和 .jsx 結尾的 js 檔案的詳細資訊。它具有將在給定的入口點中查詢 .js 結尾的模式。
Include 指示要用於查詢檔案的資料夾。
Loader 使用 babel-loader 編譯程式碼。
Query 具有 presets 屬性,它是一個數組,其值為 '@babel/preset-env'。它將根據您需要的 ES 環境轉換程式碼。
最終的資料夾結構如下所示:
rxjsproj/
node_modules/
src/
testrx.js
index.html
package.json
webpack.config.js
執行命令
npm run publish 將在其中建立 dev/ 資料夾和 main_bundle.js 檔案。伺服器將啟動,您可以在瀏覽器中測試您的 index.html,如下所示。
開啟瀏覽器並訪問 url: https://:8080/
輸出顯示在控制檯中。
RxJS - 最新更新
在本教程中,我們使用的是 RxJS 版本 6。RxJS 通常用於處理響應式程式設計,並且更常與 Angular、ReactJS 一起使用。Angular 6 預設載入 rxjs6。
與版本 6 相比,RxJS 版本 5 的處理方式有所不同。如果您將 RxJS 5 更新到 6,程式碼將中斷。在本章中,我們將瞭解處理版本更新的不同方法。
如果您正在將 RxJS 更新到 6 並且不想更改程式碼,您也可以這樣做,並且需要安裝以下軟體包。
npm install --save-dev rxjs-compact
此軟體包將負責提供向後相容性,舊程式碼將可以與 RxJS 版本 6 正常工作。如果您想更改與 RxJS 6 相容的程式碼,以下是要進行的更改。
運算子、Observables、主題的軟體包已重新構建,因此匯入方面的主要更改如下所述。
運算子匯入
根據版本 5,對於運算子,應包含以下匯入語句:
import 'rxjs/add/operator/mapTo' import 'rxjs/add/operator/take' import 'rxjs/add/operator/tap' import 'rxjs/add/operator/map'
在 RxJS 版本 6 中,匯入將如下所示:
import {mapTo, take, tap, map} from "rxjs/operators"
建立 Observables 的方法匯入
根據版本 5,在使用 Observables 時,應包含以下匯入方法:
import "rxjs/add/observable/from"; import "rxjs/add/observable/of"; import "rxjs/add/observable/fromEvent"; import "rxjs/add/observable/interval";
在 RxJS 版本 6 中,匯入將如下所示:
import {from, of, fromEvent, interval} from 'rxjs';
Observables 匯入
在 RxJS 版本 5 中,在使用 Observables 時,應包含以下匯入語句:
import { Observable } from 'rxjs/Observable'
在 RxJS 版本 6 中,匯入將如下所示:
import { Observable } from 'rxjs'
Subject 匯入
在 RxJS 版本 5 中,Subject 應包含如下所示:
import { Subject} from 'rxjs/Subject'
在 RxJS 版本 6 中,匯入將如下所示:
import { Subject } from 'rxjs'
如何在 RxJS 6 中使用運算子?
pipe() 方法 可用於建立的 Observable。它從版本 5.5 開始新增到 RxJS 中。現在使用 pipe(),您可以按順序一起處理多個運算子。以下是 RxJS 版本 5 中運算子的使用方式。
示例
import "rxjs/add/observable/from";
import 'rxjs/add/operator/max'
let list1 = [1, 6, 15, 10, 58, 2, 40];
from(list1).max((a,b)=>a-b).subscribe(x => console.log("The Max value is "+x));
從 RxJS 版本 5.5 開始,我們必須使用 pipe() 來執行運算子:
示例
import { from } from 'rxjs';
import { max } from 'rxjs/operators';
from(list1).pipe(max((a,b)=>a-b)).subscribe(x => console.log(
"The Max value is "+x)
);
運算子重新命名
在軟體包重構期間,一些運算子被重新命名,因為它們與 JavaScript 關鍵字衝突或匹配。列表如下所示:
| 運算子 | 重新命名為 |
|---|---|
| do() | tap() |
| catch() | catchError() |
| switch() | switchAll() |
| finally() | finalize() |
| throw() | throwError() |
RxJS - Observables
Observable 是一個函式,它建立一個 Observer 並將其附加到期望獲取值的源,例如,DOM 元素的點選、滑鼠事件或 HTTP 請求等。
觀察者是一個帶有回撥函式的物件,當 Observable 發生互動時,它將被呼叫,即資料來源發生了互動,例如按鈕點選、Http 請求等。
本章我們將討論以下主題:
- 建立 Observable
- 訂閱 Observable
- 執行 Observable
建立 Observable
Observable 可以使用 Observable 建構函式建立,也可以使用 Observable create 方法建立,並將 subscribe 函式作為引數傳遞給它,如下所示:
testrx.js
import { Observable } from 'rxjs';
var observable = new Observable(
function subscribe(subscriber) {
subscriber.next("My First Observable")
}
);
我們建立了一個 Observable,並使用 Observable 內部的 subscriber.next 方法添加了一條訊息“My First Observable”。
我們也可以使用 Observable.create() 方法建立 Observable,如下所示:
testrx.js
import { Observable } from 'rxjs';
var observer = Observable.create(
function subscribe(subscriber) {
subscriber.next("My First Observable")
}
);
訂閱 Observable
您可以如下訂閱 Observable:
testrx.js
import { Observable } from 'rxjs';
var observer = new Observable(
function subscribe(subscriber) {
subscriber.next("My First Observable")
}
);
observer.subscribe(x => console.log(x));
當觀察者訂閱時,它將開始執行 Observable。
這是我們在瀏覽器控制檯中看到的:
執行 Observable
Observable 在被訂閱時執行。觀察者是一個帶有三個方法的物件,這些方法會收到通知:
next() - 此方法將傳送值,例如數字、字串、物件等。
complete() - 此方法不會發送任何值,並指示 Observable 已完成。
error() - 如果有任何錯誤,此方法將傳送錯誤。
讓我們建立包含所有三個通知的 Observable 並執行它。
testrx.js
import { Observable } from 'rxjs';
var observer = new Observable(
function subscribe(subscriber) {
try {
subscriber.next("My First Observable");
subscriber.next("Testing Observable");
subscriber.complete();
} catch(e){
subscriber.error(e);
}
}
);
observer.subscribe(x => console.log(x), (e)=>console.log(e),
()=>console.log("Observable is complete"));
在上面的程式碼中,我們添加了 next、complete 和 error 方法。
try{
subscriber.next("My First Observable");
subscriber.next("Testing Observable");
subscriber.complete();
} catch(e){
subscriber.error(e);
}
要執行 next、complete 和 error,我們必須呼叫 subscribe 方法,如下所示:
observer.subscribe(x => console.log(x), (e)=>console.log(e),
()=>console.log("Observable is complete"));
只有在發生錯誤時才會呼叫 error 方法。
這是在瀏覽器中看到的輸出:
RxJS - 運算子
運算子是 RxJS 的重要組成部分。運算子是一個純函式,它接收 Observable 作為輸入,輸出也是一個 Observable。
使用運算子
運算子是一個純函式,它接收 Observable 作為輸入,輸出也是一個 Observable。
要使用運算子,我們需要 pipe() 方法。
使用 pipe() 的示例
let obs = of(1,2,3); // an observable obs.pipe( operator1(), operator2(), operator3(), operator3(), )
在上面的示例中,我們使用 of() 方法建立了一個 Observable,該方法接收值 1、2 和 3。現在,在這個 Observable 上,您可以使用 pipe() 方法使用任意數量的運算子執行不同的操作,如上所示。運算子的執行將按順序在給定的 Observable 上進行。
下面是一個工作示例:
import { of } from 'rxjs';
import { map, reduce, filter } from 'rxjs/operators';
let test1 = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
let case1 = test1.pipe(
filter(x => x % 2 === 0),
reduce((acc, one) => acc + one, 0)
)
case1.subscribe(x => console.log(x));
Output
30
在上面的示例中,我們使用了 filter 運算子,它過濾偶數,接下來我們使用了 reduce() 運算子,它將新增偶數值並在訂閱時給出結果。
以下是我們將要討論的 Observable 列表。
- 建立
- 數學
- 連線
- 轉換
- 過濾
- 實用程式
- 條件
- 多播
- 錯誤處理
建立運算子
以下是我們將在建立運算子類別中討論的運算子:
| 序號 | 運算子和描述 |
|---|---|
| 1 |
ajax
此運算子將為給定的 URL 發起一個 ajax 請求。 |
| 2 |
from
此運算子將從陣列、類陣列物件、Promise、可迭代物件或類 Observable 物件建立 Observable。 |
| 3 |
fromEvent
此運算子將輸出一個 Observable,該 Observable 將用於發出事件的元素,例如按鈕、點選等。 |
| 4 |
fromEventPattern
此運算子將從用於註冊事件處理程式的輸入函式建立 Observable。 |
| 5 |
interval
此運算子將為給定時間內的每次建立 Observable。 |
| 6 |
of
此運算子將接收傳遞的引數並將其轉換為 Observable。 |
| 7 |
range
此運算子將建立一個 Observable,該 Observable 將根據提供的範圍為您提供一系列數字。 |
| 8 |
throwError
此運算子將建立一個 Observable,該 Observable 將通知錯誤。 |
| 9 |
timer
此運算子將建立一個 Observable,該 Observable 將在超時後發出值,並且每個呼叫後值將持續增加。 |
| 10 |
iif
此運算子將決定訂閱哪個 Observable。 |
數學運算子
以下是我們將在數學運算子類別中討論的運算子:
| 序號 | 運算子和描述 |
|---|---|
| 1 |
Count
count() 運算子接收一個包含值的 Observable,並將其轉換為一個將發出單個值的 Observable。 |
| 2 |
Max
Max 方法將接收一個包含所有值的 Observable,並返回一個包含最大值的 Observable。 |
| 3 |
Min
Min 方法將接收一個包含所有值的 Observable,並返回一個包含最小值的 Observable。 |
| 4 |
Reduce
在 reduce 運算子中,累加器函式用於輸入 Observable,累加器函式將以 Observable 的形式返回累加值,並可選地將種子值傳遞給累加器函式。 reduce() 函式將接收兩個引數,一個累加器函式,第二個是種子值。 |
連線運算子
以下是我們將在連線運算子類別中討論的運算子。
| 序號 | 運算子和描述 |
|---|---|
| 1 |
concat
此運算子將按順序發出作為輸入給出的 Observable,然後繼續下一個。 |
| 2 |
forkJoin
此運算子將接收陣列或字典物件作為輸入,並將等待 Observable 完成,然後返回從給定 Observable 發出的最後一個值。 |
| 3 |
merge
此運算子將接收輸入 Observable,並將發出來自 Observable 的所有值,併發出一個輸出 Observable。 |
| 4 |
race
它將返回一個 Observable,該 Observable 將是第一個源 Observable 的映象副本。 |
轉換運算子
以下是我們將在轉換運算子類別中討論的運算子。
| 序號 | 運算子和描述 |
|---|---|
| 1 |
buffer
buffer 操作在一個 Observable 上,並接收一個 Observable 作為引數。它將開始將其原始 Observable 上發出的值緩衝到陣列中,並在作為引數提供的 Observable 發出時發出這些值。一旦作為引數提供的 Observable 發出,緩衝區將重置並再次開始在原始 Observable 上緩衝,直到輸入 Observable 發出,然後相同的場景重複。 |
| 2 |
bufferCount
在 buffercount() 運算子的情況下,它將收集在其呼叫的 Observable 上的值,並在給定給 buffercount 的緩衝區大小匹配時發出這些值。 |
| 3 |
bufferTime
這類似於 bufferCount,因此在這裡,它將收集在其呼叫的 Observable 上的值,並在完成 bufferTimeSpan 時發出這些值。它接收一個引數,即 bufferTimeSpan。 |
| 4 |
bufferToggle
在 bufferToggle() 的情況下,它接收兩個引數,openings 和 closingSelector。opening 引數是可訂閱的或 Promise 以啟動緩衝區,第二個引數 closingSelector 又是可訂閱的或 Promise,指示關閉緩衝區併發出收集的值。 |
| 5 |
bufferWhen
此運算子將以陣列形式給出值,它接收一個函式作為引數,該函式將決定何時關閉、發出和重置緩衝區。 |
| 6 |
expand
expand 運算子接收一個函式作為引數,該函式遞迴地應用於源 Observable 以及輸出 Observable。最終值為一個 Observable。 |
| 7 |
groupBy
在 groupBy 運算子中,輸出根據特定條件進行分組,這些組專案作為 GroupedObservable 發出。 |
| 8 |
map
在 map 運算子的情況下,投影函式應用於源 Observable 上的每個值,並將相同的輸出作為 Observable 發出。 |
| 9 |
mapTo
每次源 Observable 發出值時,都會輸出一個常數值以及 Observable。 |
| 10 |
mergeMap
在 mergeMap 運算子的情況下,投影函式應用於每個源值,其輸出與輸出 Observable 合併。 |
| 11 |
switchMap
在 switchMap 運算子的情況下,投影函式應用於每個源值,其輸出與輸出 Observable 合併,並且給定的值為最新的投影 Observable。 |
| 12 |
window
它接收一個引數 windowboundaries,它是一個 Observable,並在給定的 windowboundaries 發出時返回一個巢狀的 Observable。 |
過濾運算子
以下是我們將在過濾運算子類別中討論的運算子。
| 序號 | 運算子和描述 |
|---|---|
| 1 |
debounce
源 Observable 在一段時間後發出的值,並且發射由作為 Observable 或 Promise 給出的另一個輸入確定。 |
| 2 |
debounceTime
它將僅在時間完成後從源 Observable 發出值。 |
| 3 |
distinct
此運算子將給出來自源 Observable 的所有與前一個值相比不同的值。 |
| 4 |
elementAt
此運算子將根據給定的索引從源 Observable 給出一個值。 |
| 5 |
filter
此運算子將根據給定的謂詞函式過濾來自源 Observable 的值。 |
| 6 |
first
此運算子將給出源 Observable 發出的第一個值。 |
| 7 |
last
此運算子將給出源 Observable 發出的最後一個值。 |
| 8 |
ignoreElements
此運算子將忽略來自源 Observable 的所有值,並且僅執行對 complete 或 error 回撥函式的呼叫。 |
| 9 |
sample
此運算子將給出來自源 Observable 的最新值,並且輸出將取決於傳遞給它的引數的發射。 |
| 10 |
skip
此運算子將返回一個 Observable,該 Observable 將跳過作為輸入接收的前 count 個專案的第一個出現。 |
| 11 |
throttle
此運算子將輸出以及忽略來自源 Observable 的值,持續時間由作為引數接收的輸入函式確定,並且將重複相同的過程。 |
實用程式運算子
以下是我們將在實用程式運算子類別中討論的運算子。
| 序號 | 運算子和描述 |
|---|---|
| 1 |
tap
此運算子將輸出與源 Observable 相同,並且可用於從 Observable 向用戶記錄值。主要值、任何錯誤或任務是否完成。 |
| 2 |
delay
此運算子根據給定的超時延遲源 Observable 發出的值。 |
| 3 |
delayWhen
此運算子根據來自作為輸入接收的另一個 Observable 的超時延遲源 Observable 發出的值。 |
| 4 |
observeOn
此運算子基於輸入排程程式將重新發出來自源 Observable 的通知。 |
| 5 |
subscribeOn
此運算子有助於根據作為輸入接收的排程程式非同步訂閱源 Observable。 |
| 6 |
timeInterval
此運算子將返回一個物件,該物件包含當前值以及使用接收的排程程式輸入計算的當前值和前一個值之間經過的時間。 |
| 7 |
timestamp
返回與來自源 Observable 的值一起的時間戳,該時間戳說明了值發出時間。 |
| 8 |
timeout
如果源 Observable 在給定的超時後未發出值,則此運算子將丟擲錯誤。 |
| 9 |
toArray
累積來自 Observable 的所有源值,並在源完成時將其作為陣列輸出。 |
條件運算子
下面我們將討論條件運算子類別中的運算子。
| 序號 | 運算子和描述 |
|---|---|
| 1 |
defaultIfEmpty
如果源 Observable 為空,此運算子將返回一個預設值。 |
| 2 |
every
它將根據輸入函式是否滿足源 Observable 上每個值的條件返回一個 Observable。 |
| 3 |
find
當源 Observable 的第一個值滿足作為輸入的謂詞函式的條件時,這將返回該 Observable。 |
| 4 |
findIndex
此運算子基於輸入排程程式將重新發出來自源 Observable 的通知。 |
| 5 |
isEmpty
如果輸入 Observable 在不發出任何值的情況下完成回撥,則此運算子將輸出 true;如果輸入 Observable 發出任何值,則輸出 false。 |
多播運算子
下面我們將討論多播運算子類別中的運算子。
| 序號 | 運算子和描述 |
|---|---|
| 1 |
multicast
multicast 運算子共享與其他訂閱者建立的單個訂閱。multicast 接收的引數是一個主題或一個返回 ConnectableObservable 的工廠方法,該方法具有 connect() 方法。要訂閱,必須呼叫 connect() 方法。 |
| 2 |
publish
此運算子返回 ConnectableObservable,需要使用 connect() 方法訂閱 Observable。 |
| 3 |
publishBehavior
publishBehavior 使用 BehaviourSubject,並返回 ConnectableObservable。必須使用 connect() 方法訂閱建立的 Observable。 |
| 4 |
publishLast
publishBehaviour 使用 AsyncSubject,並返回 ConnectableObservable。必須使用 connect() 方法訂閱建立的 Observable。 |
| 5 |
publishReplay
publishReplay 使用行為主題,其中它可以緩衝值並將相同的值重播給新的訂閱者,並返回 ConnectableObservable。必須使用 connect() 方法訂閱建立的 Observable。 |
| 6 |
share
它是 mutlicast() 運算子的別名,唯一的區別是您不必手動呼叫 connect() 方法來啟動訂閱。 |
錯誤處理運算子
下面我們將討論錯誤處理運算子類別中的運算子。
| 序號 | 運算子和描述 |
|---|---|
| 1 |
catchError
此運算子負責透過返回新的 Observable 或錯誤來捕獲源 Observable 上的錯誤。 |
| 2 |
retry
如果源 Observable 發生錯誤,此運算子將負責重新嘗試源 Observable,並且重試將根據給定的輸入計數進行。 |
RxJS - 使用訂閱
建立 Observable 後,要執行 Observable,我們需要訂閱它。
count() 運算子
這是一個關於如何訂閱 Observable 的簡單示例。
示例 1
import { of } from 'rxjs';
import { count } from 'rxjs/operators';
let all_nums = of(1, 7, 5, 10, 10, 20);
let final_val = all_nums.pipe(count());
final_val.subscribe(x => console.log("The count is "+x));
Output
The count is 6
訂閱有一個名為 unsubscribe() 的方法。呼叫 unsubscribe() 方法將刪除用於該 Observable 的所有資源,即 Observable 將被取消。這是一個使用 unsubscribe() 方法的工作示例。
示例 2
import { of } from 'rxjs';
import { count } from 'rxjs/operators';
let all_nums = of(1, 7, 5, 10, 10, 20);
let final_val = all_nums.pipe(count());
let test = final_val.subscribe(x => console.log("The count is "+x));
test.unsubscribe();
訂閱儲存在變數 test 中。我們使用了 test.unsubscribe() 取消訂閱 Observable。
Output
The count is 6
RxJS - 使用主題
主題是一個可以多播(即與許多觀察者通訊)的 Observable。考慮一個帶有事件偵聽器的按鈕,使用 addListener 附加到事件的函式在使用者每次單擊按鈕時都會被呼叫,主題的功能也類似。
我們將在本章中討論以下主題:
- 建立主題
- Observable 和 Subject 之間有什麼區別?
- Behaviour Subject
- Replay Subject
- AsyncSubject
建立主題
要使用主題,我們需要匯入 Subject,如下所示:
import { Subject } from 'rxjs';
您可以如下建立主題物件:
const subject_test = new Subject();
該物件是一個觀察者,它具有三個方法:
- next(v)
- error(e)
- complete()
訂閱主題
您可以像下面這樣在主題上建立多個訂閱:
subject_test.subscribe({
next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
next: (v) => console.log(`From Subject: ${v}`)
});
訂閱註冊到主題物件,就像我們之前討論過的 addListener 一樣。
將資料傳遞到主題
您可以使用 next() 方法將資料傳遞到建立的主題。
subject_test.next("A");
資料將傳遞到主題上新增的所有訂閱。
示例
這是一個主題的工作示例:
import { Subject } from 'rxjs';
const subject_test = new Subject();
subject_test.subscribe({
next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.next("B");
主題_test 物件是透過呼叫 new Subject() 建立的。主題_test 物件引用 next()、error() 和 complete() 方法。上面示例的輸出如下所示:
Output
我們可以使用 complete() 方法停止主題執行,如下所示。
示例
import { Subject } from 'rxjs';
const subject_test = new Subject();
subject_test.subscribe({
next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.complete();
subject_test.next("B");
一旦我們呼叫 complete,稍後呼叫的 next 方法將不會被呼叫。
Output
現在讓我們看看如何呼叫 error() 方法。
示例
下面是一個工作示例:
import { Subject } from 'rxjs';
const subject_test = new Subject();
subject_test.subscribe({
error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.subscribe({
error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.error(new Error("There is an error"));
Output
Observable 和 Subject 之間有什麼區別?
一個 Observable 將一對一地與訂閱者通訊。任何時候您訂閱 Observable,執行都將從頭開始。以使用 ajax 發出的 Http 呼叫為例,以及兩個呼叫 Observable 的訂閱者。您將在瀏覽器網路選項卡中看到 2 個 Http 請求。
示例
這是一個相同的工作示例:
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';
let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber1 = final_val.subscribe(a => console.log(a));
let subscriber2 = final_val.subscribe(a => console.log(a));
Output
現在,這裡的問題是,我們希望共享相同的資料,但不是以 2 個 Http 呼叫的代價。我們希望發出一個 Http 呼叫並在訂閱者之間共享資料。
這可以透過主題來實現。它是一個可以多播(即與許多觀察者通訊)的 Observable。它可以在訂閱者之間共享值。
示例
這是一個使用主題的工作示例:
import { Subject } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';
const subject_test = new Subject();
subject_test.subscribe({
next: (v) => console.log(v)
});
subject_test.subscribe({
next: (v) => console.log(v)
});
let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber = final_val.subscribe(subject_test);
Output
現在您可以看到只有一個 Http 呼叫,並且相同的資料在呼叫的訂閱者之間共享。
Behaviour Subject
Behaviour Subject 在被呼叫時將為您提供最新值。
您可以如下建立行為主題:
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject("Testing Behaviour Subject");
// initialized the behaviour subject with value:Testing Behaviour Subject
示例
這是一個使用 Behaviour Subject 的工作示例:
import { BehaviorSubject } from 'rxjs';
const behavior_subject = new BehaviorSubject("Testing Behaviour Subject");
// 0 is the initial value
behavior_subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
behavior_subject.next("Hello");
behavior_subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
behavior_subject.next("Last call to Behaviour Subject");
Output
Replay Subject
ReplaySubject 類似於 Behaviour Subject,它可以緩衝值並將相同的值重播給新的訂閱者。
示例
這是一個 Replay Subject 的工作示例:
import { ReplaySubject } from 'rxjs';
const replay_subject = new ReplaySubject(2);
// buffer 2 values but new subscribers
replay_subject.subscribe({
next: (v) => console.log(`Testing Replay Subject A: ${v}`)
});
replay_subject.next(1);
replay_subject.next(2);
replay_subject.next(3);
replay_subject.subscribe({
next: (v) => console.log(`Testing Replay Subject B: ${v}`)
});
replay_subject.next(5);
Replay Subject 上使用的緩衝區值為 2。因此,最後兩個值將被緩衝並用於呼叫的新訂閱者。
Output
AsyncSubject
在 AsyncSubject 的情況下,最後一個呼叫的值將傳遞給訂閱者,並且只有在呼叫 complete() 方法後才會完成。
示例
這是一個相同的工作示例:
import { AsyncSubject } from 'rxjs';
const async_subject = new AsyncSubject();
async_subject.subscribe({
next: (v) => console.log(`Testing Async Subject A: ${v}`)
});
async_subject.next(1);
async_subject.next(2);
async_subject.complete();
async_subject.subscribe({
next: (v) => console.log(`Testing Async Subject B: ${v}`)
});
這裡,在呼叫 complete 之前,傳遞給主題的最後一個值為 2,並且將其提供給訂閱者。
Output
RxJS - 使用排程器
排程器控制訂閱何時開始和通知。
要使用排程程式,我們需要以下內容:
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
這是一個工作示例,其中我們將使用排程程式來決定執行。
示例
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
var observable = new Observable(function subscribe(subscriber) {
subscriber.next("My First Observable");
subscriber.next("Testing Observable");
subscriber.complete();
}).pipe(
observeOn(asyncScheduler)
);
console.log("Observable Created");
observable.subscribe(
x => console.log(x),
(e)=>console.log(e),
()=>console.log("Observable is complete")
);
console.log('Observable Subscribed');
Output
如果沒有排程程式,輸出將如下所示:
使用 RxJS 和 Angular
在本章中,我們將瞭解如何在 Angular 中使用 RxJs。我們這裡不會介紹 Angular 的安裝過程,要了解 Angular 安裝,請參考此連結:https://tutorialspoint.tw/angular7/angular7_environment_setup.htm
我們將直接在一個示例上進行操作,其中將使用來自 RxJS 的 Ajax 載入資料。
示例
app.component.ts
import { Component } from '@angular/core';
import { environment } from './../environments/environment';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators'
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent {
title = '';
data;
constructor() {
this.data = "";
this.title = "Using RxJs with Angular";
let a = this.getData();
}
getData() {
const response =
ajax('https://jsonplaceholder.typicode.com/users')
.pipe(map(e => e.response));
response.subscribe(res => {
console.log(res);
this.data = res;
});
}
}
app.component.html
<div>
<h3>{{title}}</h3>
<ul *ngFor="let i of data">
<li>{{i.id}}: {{i.name}}</li>
</ul>
</div>
<router-outlet></router-outlet>
我們使用了來自 RxJS 的 ajax,它將從以下 url 載入資料: https://jsonplaceholder.typicode.com/users。
編譯後,顯示如下:
使用 RxJS 和 ReactJS
在本章中,我們將瞭解如何在 ReactJS 中使用 RxJs。我們這裡不會介紹 Reactjs 的安裝過程,要了解 ReactJS 安裝,請參考此連結:https://tutorialspoint.tw/reactjs/reactjs_environment_setup.htm
示例
我們將直接在下面的示例上進行操作,其中將使用來自 RxJS 的 Ajax 載入資料。
index.js
import React, { Component } from "react";
import ReactDOM from "react-dom";
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';
class App extends Component {
constructor() {
super();
this.state = { data: [] };
}
componentDidMount() {
const response = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
response.subscribe(res => {
this.setState({ data: res });
});
}
render() {
return (
<div>
<h3>Using RxJS with ReactJS</h3>
<ul>
{this.state.data.map(el => (
<li>
{el.id}: {el.name}
</li>
))}
</ul>
</div>
);
}
}
ReactDOM.render(<App />, document.getElementById("root"));
index.html
<!DOCTYPE html>
<html>
<head>
<meta charset = "UTF-8" />
<title>ReactJS Demo</title>
<head>
<body>
<div id = "root"></div>
</body>
</html>
我們使用了來自 RxJS 的 ajax,它將從以下 Url 載入資料: https://jsonplaceholder.typicode.com/users。
編譯後,顯示如下: