RxPY 快速指南



RxPY - 概述

本章解釋什麼是響應式程式設計,什麼是 RxPY,它的運算子、特性、優點和缺點。

什麼是響應式程式設計?

響應式程式設計是一種程式設計正規化,它處理資料流和變化的傳播。這意味著,當一個元件發出資料流時,變化將透過響應式程式設計庫傳播到其他元件。變化的傳播將持續到到達最終接收者。

使用 RxPY,您可以很好地控制非同步資料流,例如,可以使用 Observable 追蹤對 URL 的請求,並使用 Observer 監聽請求何時完成以獲取響應或錯誤。

RxPY 允許您使用**Observables** 處理非同步資料流,使用**Operators**(例如 filter、sum、concat、map)查詢資料流,並使用**Schedulers**利用資料流的併發性。建立一個 Observable 會得到一個帶有 on_next(v)、on_error(e) 和 on_completed() 方法的 Observer 物件,需要對其進行**訂閱**才能在事件發生時收到通知。

Observable

可以使用管道運算子以鏈式格式使用多個運算子查詢 Observable。

RxPY 提供各種類別的運算子:−

  • 數學運算子

  • 轉換運算子

  • 過濾運算子

  • 錯誤處理運算子

  • 實用程式運算子

  • 條件運算子

  • 建立運算子

  • 可連線運算子

本教程將詳細解釋這些運算子。

什麼是 RxPy?

根據 RxPy 的官方網站 https://rxpy.readthedocs.io/en/latest/. 的定義,RxPY 被定義為**一個用於使用可觀察集合和可管道查詢運算子在 Python 中組合非同步和基於事件的程式的庫**。

RxPY 是一個支援響應式程式設計的 Python 庫。RxPY 代表**Python 的響應式擴充套件**。它是一個使用 Observable 來處理響應式程式設計的庫,該庫處理非同步資料呼叫、回撥和基於事件的程式。

RxPy 的特性

在 RxPy 中,以下概念負責處理非同步任務 −

Observable

Observable 是一個建立 Observer 並將其附加到資料來源的函式,該資料來源包含預期的的資料流,例如推文、計算機相關事件等。

Observer

它是一個具有 on_next()、on_error() 和 on_completed() 方法的物件,當與 Observable 互動時(例如,傳入推文等),這些方法將被呼叫。

Subscription (訂閱)

建立 Observable 後,需要訂閱它才能執行。

Operators (運算子)

運算子是一個純函式,它以 Observable 作為輸入,輸出也是一個 Observable。您可以使用管道運算子在 Observable 資料上使用多個運算子。

Subject

Subject 既是一個 Observable 序列,也是一個 Observer,可以進行多播,即與許多已訂閱的 Observer 通訊。Subject 是一個冷 Observable,即值將在已訂閱的 Observer 之間共享。

Schedulers (排程器)

RxPy 的一個重要特性是併發性,即允許任務並行執行。為此,RxPy 有兩個運算子 subscribe_on() 和 observe_on() 與排程器一起工作,並決定訂閱任務的執行。

使用 RxPY 的優點

以下是 RxPy 的優點 −

  • 在處理非同步資料流和事件方面,RxPY 是一個很棒的庫。RxPY 使用 Observable 來處理響應式程式設計,該程式設計處理非同步資料呼叫、回撥和基於事件的程式。

  • RxPy 提供了大量的運算子,包括數學、轉換、過濾、實用程式、條件、錯誤處理、連線類別,這使得在使用響應式程式設計時更容易。

  • 在 RxPY 中使用排程器可以實現併發,即多個任務一起工作。

  • 使用 RxPY 可以提高效能,因為非同步任務和並行處理更容易實現。

使用 RxPY 的缺點

  • 使用 Observable 除錯程式碼有點困難。

RxPY - 環境搭建

本章將介紹 RxPy 的安裝。要開始使用 RxPy,首先需要安裝 Python。因此,我們將進行以下操作 −

  • 安裝 Python
  • 安裝 RxPy

安裝 Python

訪問 Python 官方網站:https://python.club.tw/downloads/. 如下所示,點選適用於 Windows、Linux/Unix 和 macOS 的最新版本。根據您的 64 位或 32 位作業系統下載 Python。

Python

下載完成後,點選**.exe 檔案**並按照步驟在您的系統上安裝 python。

Python Install

python 包管理器 pip 也將隨上述安裝預設安裝。為了使其在您的系統上全域性工作,直接將 python 的位置新增到 PATH 變數中,安裝開始時也會顯示相同的內容,請記住選中“新增到 PATH”複選框。如果您忘記選中它,請按照以下步驟新增到 PATH。

要新增到 PATH,請按照以下步驟操作 −

右鍵單擊您的計算機圖示,然後單擊屬性→高階系統設定。

將顯示如下螢幕 −

System Properties

單擊上面顯示的環境變數。將顯示如下螢幕 −

Environment Variable

選擇 Path 並單擊“編輯”按鈕,在末尾新增 python 的位置路徑。現在,讓我們檢查 python 版本。

檢查 python 版本

E:\pyrx>python --version
Python 3.7.3

安裝 RxPY

現在,我們已經安裝了 python,我們將安裝 RxPy。

安裝 python 後,python 包管理器 pip 也將安裝。以下是檢查 pip 版本的命令 −

E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)

我們已經安裝了 pip,版本為**19.1.1**。現在,我們將使用 pip 安裝 RxPy

命令如下 −

pip install rx
Pip Install Rx

RxPY - 最新版本更新

在本教程中,我們使用的是 RxPY 版本 3 和 python 版本 3.7.3。RxPY 版本 3 的工作方式與早期版本(即 RxPY 版本 1)略有不同。

本章將討論這兩個版本之間的區別以及在更新 Python 和 RxPY 版本時需要進行的更改。

RxPY 中的 Observable

在 RxPY 版本 1 中,Observable 是一個單獨的類 −

from rx import Observable

要使用 Observable,您必須按如下方式使用它 −

Observable.of(1,2,3,4,5,6,7,8,9,10)

在 RxPY 版本 3 中,Observable 直接是 rx 包的一部分。

示例

import rx
rx.of(1,2,3,4,5,6,7,8,9,10)

RxPY 中的運算子

在版本 1 中,運算子是 Observable 類中的方法。例如,要使用運算子,我們必須匯入 Observable,如下所示 −

from rx import Observable

運算子用作 Observable.operator,例如,如下所示 −

Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

對於 RxPY 版本 3,運算子是函式,匯入和使用方式如下 −

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

使用 pipe() 方法連結運算子

在 RxPY 版本 1 中,如果您必須在 Observable 上使用多個運算子,則必須按如下方式進行 −

示例

from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

但是,對於 RxPY 版本 3,您可以使用 pipe() 方法和多個運算子,如下所示 −

示例

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

RxPY - 使用 Observables

Observable 是一個建立 Observer 並將其附加到預期值的源的函式,例如,來自 DOM 元素的點選、滑鼠事件等。

本章將詳細介紹以下主題。

  • 建立 Observables

  • 訂閱和執行 Observable

建立 Observables

要建立一個 Observable,我們將使用**create()** 方法並將函式傳遞給它,該函式包含以下專案。

  • **on_next()** − 當 Observable 發出專案時,此函式將被呼叫。

  • **on_completed()** − 當 Observable 完成時,此函式將被呼叫。

  • **on_error()** − 當 Observable 發生錯誤時,此函式將被呼叫。

要使用 create() 方法,首先匯入該方法,如下所示 −

from rx import create

這是一個建立 Observable 的工作示例 −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

訂閱和執行 Observable

要訂閱 Observable,我們需要使用 subscribe() 函式並將回撥函式 on_next、on_error 和 on_completed 傳遞給它。

這是一個工作示例 −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

subscribe() 方法負責執行 Observable。回撥函式**on_next**、**on_error** 和**on_completed** 必須傳遞給 subscribe 方法。對 subscribe 方法的呼叫反過來又會執行 test_observable() 函式。

並非必須將所有三個回撥函式都傳遞給 subscribe() 方法。您可以根據您的需求傳遞 on_next()、on_error() 和 on_completed()。

lambda 函式用於 on_next、on_error 和 on_completed。它將接收引數並執行給定的表示式。

以下是建立的 Observable 的輸出 −

E:\pyrx>python testrx.py
Got - Hello
Job Done!

RxPY - 運算子

本章詳細解釋了 RxPY 中的運算子。這些運算子包括 −

  • 使用運算子
  • 數學運算子
  • 轉換運算子
  • 過濾運算子
  • 錯誤處理運算子
  • 實用程式運算子
  • 條件運算子
  • 建立運算子
  • 可連線運算子
  • 組合運算子

Reactive (Rx) python 幾乎有很多運算子,這使得 python 編碼更容易。您可以將這些多個運算子一起使用,例如,在處理字串時,您可以使用 map、filter、merge 運算子。

使用運算子

您可以使用 pipe() 方法一起使用多個運算子。此方法允許將多個運算子連結在一起。

這是一個使用運算子的工作示例 −

test = of(1,2,3) // an observable
subscriber = test.pipe(
   op1(),
   op2(),
   op3()
)

在上例中,我們使用 `of()` 方法建立了一個 Observable,該方法接收值 1、2 和 3。現在,在這個 Observable 上,您可以使用任意數量的運算子執行不同的操作,如上所示使用 `pipe()` 方法。運算子的執行將在給定的 Observable 上順序進行。

要使用運算子,首先需要匯入它,如下所示:

from rx import of, operators as op

這是一個可執行的例子:

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

在上例中,有一個數字列表,我們使用 `filter` 運算子過濾偶數,然後使用 `reduce` 運算子對其進行求和。

輸出

E:\pyrx>python testrx.py
Sum of Even numbers is 30

以下是一些我們將要討論的運算子:

  • 建立 Observable
  • 數學運算子
  • 轉換運算子
  • 過濾運算子
  • 錯誤處理運算子
  • 實用程式運算子
  • 條件運算子
  • 可連線運算子
  • 組合運算子

建立 Observable

以下是我們將在建立類別中討論的 Observable:

顯示示例

Observable 描述
create 此方法用於建立 Observable。
empty 此 Observable 不會輸出任何內容,並直接發出完成狀態。
never 此方法建立一個永遠不會到達完成狀態的 Observable。
throw 此方法將建立一個丟擲錯誤的 Observable。
from_ 此方法將給定的陣列或物件轉換為 Observable。
interval 此方法將在超時後產生一系列值。
just 此方法將給定值轉換為 Observable。
range 此方法將根據給定的輸入生成一系列整數。
repeat_value 此方法將建立一個 Observable,根據給定的計數重複給定的值。
start 此方法接收一個函式作為輸入,並返回一個 Observable,該 Observable 將返回輸入函式的值。
timer 此方法將在超時後順序發出值。

數學運算子

我們將在數學運算子類別中討論的運算子如下:

顯示示例

運算子 描述
average 此運算子將計算給定源 Observable 的平均值,並輸出一個包含平均值的 Observable。
concat 此運算子將接收兩個或多個 Observable,並生成一個包含所有值的單個 Observable。
count

此運算子接收一個包含值的 Observable,並將其轉換為一個包含單個值的 Observable。count 函式可選地接收一個謂詞函式。

該函式的型別為布林值,只有在滿足條件時才會將值新增到輸出中。

max 此運算子將返回一個包含源 Observable 中最大值的 Observable。
min 此運算子將返回一個包含源 Observable 中最小值的 Observable。
reduce 此運算子接收一個名為累加器函式的函式,該函式用於處理來自源 Observable 的值,並以 Observable 的形式返回累加值,並可以選擇向累加器函式傳遞種子值。
sum 此運算子將返回一個包含所有源 Observable 值之和的 Observable。

轉換運算子

我們將要討論的轉換運算子類別中的運算子如下:

顯示示例

運算子 類別
buffer 此運算子將收集來自源 Observable 的所有值,並在滿足給定的邊界條件後定期發出它們。
group_by 此運算子將根據給定的 key_mapper 函式對來自源 Observable 的值進行分組。
map 此運算子將根據給定的 mapper_func 的輸出,將源 Observable 中的每個值更改為一個新值。
scan 此運算子將對來自源 Observable 的值應用累加器函式,並返回一個包含新值的新 Observable。

過濾運算子

我們將要討論的過濾運算子類別中的運算子如下:

顯示示例

運算子 類別
debounce 此運算子將發出來自源 Observable 的值,直到給定的時間跨度,並忽略其餘時間段。
distinct 此運算子將發出所有與源 Observable 中不同的值。
element_at 此運算子將根據給定的索引發出源 Observable 中的一個元素。
filter 此運算子將根據給定的謂詞函式過濾源 Observable 中的值。
first 此運算子將發出源 Observable 中的第一個元素。
ignore_elements 此運算子將忽略源 Observable 中的所有值,只執行對 complete 或 error 回撥函式的呼叫。
last 此運算子將發出源 Observable 中的最後一個元素。
skip 此運算子將返回一個 Observable,該 Observable 將跳過作為輸入的 count 個專案的第一次出現。
skip_last 此運算子將返回一個 Observable,該 Observable 將跳過作為輸入的 count 個專案的最後一次出現。
take 此運算子將根據給定的計數返回連續順序的源值列表。
take_last 此運算子將根據給定的計數從最後返回連續順序的源值列表。

錯誤處理運算子

我們將要討論的錯誤處理運算子類別中的運算子如下:

顯示示例

運算子 描述
catch 當出現異常時,此運算子將終止源 Observable。
retry 當出現錯誤時,此運算子將重試源 Observable,並在重試次數完成後終止。

實用程式運算子

以下是我們將在實用程式運算子類別中討論的運算子。

顯示示例

運算子 描述
delay 此運算子將根據給定的時間或日期延遲源 Observable 的發射。
materialize 此運算子將源 Observable 中的值轉換為以顯式通知值的形式發出的值。
time_interval 此運算子將給出源 Observable 中的值之間經過的時間。
timeout 此運算子將在經過時間後發出源 Observable 中的所有值,否則將觸發錯誤。
timestamp 此運算子將時間戳附加到源 Observable 中的所有值。

條件和布林運算子

我們將要討論的條件和布林運算子類別中的運算子如下:

顯示示例

運算子 描述
all 此運算子將檢查源 Observable 中的所有值是否都滿足給定的條件。
contains 如果給定值存在且是源 Observable 的值,則此運算子將返回一個值為 true 或 false 的 Observable。
default_if_empty 如果源 Observable 為空,則此運算子將返回一個預設值。
sequence_equal 此運算子將比較兩個 Observable 序列或一個值陣列,並返回一個值為 true 或 false 的 Observable。
skip_until 此運算子將丟棄源 Observable 中的值,直到第二個 Observable 發出一個值。
skip_while 此運算子將返回一個包含滿足所傳遞條件的源 Observable 中值的 Observable。
take_until 此運算子將在第二個 Observable 發出一個值或終止後丟棄源 Observable 中的值。
take_while 此運算子將在條件失敗時丟棄源 Observable 中的值。

可連線運算子

我們將要討論的可連線運算子類別中的運算子如下:

顯示示例

運算子 描述
publish 此方法將 Observable 轉換為可連線的 Observable。
ref_count 此運算子將 Observable 轉換為普通的 Observable。
replay 此方法類似於 replaySubject。即使 Observable 已經發出值,並且一些訂閱者訂閱較晚,此方法也會返回相同的值。

組合運算子

以下是我們將在組合運算子類別中討論的運算子。

顯示示例

運算子 描述
combine_latest 此運算子將為給定的輸入 Observable 建立一個元組。
merge 此運算子將合併給定的 Observable。
start_with 此運算子將接收給定的值,並將其新增到源 Observable 的開頭,然後返回完整的序列。
zip 此運算子返回一個包含元組形式值的 Observable,該元組是透過獲取給定 Observable 的第一個值等等形成的。

RxPY - 使用 Subject

Subject 是一個 Observable 序列,也是一個可以多播(即與許多已訂閱的觀察者對話)的觀察者。

我們將討論以下關於 Subject 的主題:

  • 建立 Subject
  • 訂閱 Subject
  • 向 Subject 傳遞資料
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

建立 Subject

要使用 Subject,我們需要匯入 Subject,如下所示:

from rx.subject import Subject

您可以如下建立 Subject 物件:

subject_test = Subject()

該物件是一個觀察者,具有三種方法:

  • on_next(value)
  • on_error(error) 和
  • on_completed()

訂閱 Subject

您可以在 Subject 上建立多個訂閱,如下所示:

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

向 Subject 傳遞資料

您可以使用 on_next(value) 方法向建立的 Subject 傳遞資料,如下所示:

subject_test.on_next("A")
subject_test.on_next("B")

資料將傳遞給新增到 Subject 的所有訂閱。

這是一個 Subject 的可執行示例。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

subject_test 物件是透過呼叫 Subject() 建立的。subject_test 物件引用 on_next(value)、on_error(error) 和 on_completed() 方法。上例的輸出如下所示:

輸出

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

我們可以使用 on_completed() 方法停止 Subject 的執行,如下所示。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

一旦我們呼叫 complete,稍後呼叫的 next 方法將不會被呼叫。

輸出

E:\pyrx>python testrx.py
The value is A
The value is A

現在讓我們看看如何呼叫 on_error(error) 方法。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

輸出

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject 在呼叫時會提供最新的值。您可以如下建立 BehaviorSubject:

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

這是一個使用 BehaviorSubject 的可執行示例

示例

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

輸出

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Replay Subject

ReplaySubject 類似於 BehaviorSubject,它可以緩衝值並將它們重播給新的訂閱者。這是一個 ReplaySubject 的可執行示例。

示例

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

ReplaySubject 上使用的緩衝區值為 2。因此,最後兩個值將被緩衝並用於呼叫的新訂閱者。

輸出

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

在 AsyncSubject 的情況下,最後一個呼叫的值將傳遞給訂閱者,並且只有在呼叫 complete() 方法後才會完成。

示例

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

輸出

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

RxPY - 使用 Scheduler 實現併發

RxPy的一個重要特性是併發性,即允許任務並行執行。為此,我們有兩個運算子`subscribe_on()`和`observe_on()`,它們將與排程器一起工作,排程器將決定訂閱任務的執行。

這是一個有效的例子,它展示了`subscribe_on()`、`observe_on()`和排程器的必要性。

示例

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

在上面的例子中,我有兩個任務:任務1和任務2。任務的執行是順序的。只有當第一個任務完成後,第二個任務才會開始。

輸出

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy支援許多排程器,這裡我們將使用`ThreadPoolScheduler`。`ThreadPoolScheduler`主要嘗試管理可用的CPU執行緒。

在前面看到的例子中,我們將使用一個多處理模組,它將給我們提供`cpu_count`。這個計數將被提供給`ThreadPoolScheduler`,它將設法根據可用的執行緒來管理並行執行的任務。

這是一個可執行的例子:

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

在上面的例子中,我有兩個任務,`cpu_count`是4。由於任務是2個,而我們可用的執行緒是4個,所以這兩個任務可以並行啟動。

輸出

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

如果你看到輸出,這兩個任務已經並行啟動了。

現在,考慮一種情況,任務數量超過CPU數量,例如CPU數量是4,任務是5個。在這種情況下,我們需要檢查是否有任何執行緒在任務完成後空閒,以便可以將其分配給佇列中可用的新任務。

為此,我們可以使用`observe_on()`運算子,它將觀察排程器是否有任何執行緒空閒。這是一個使用`observe_on()`的有效例子。

示例

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

輸出

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

如果你看到輸出,任務4完成後,執行緒將被分配給下一個任務,即任務5,並且它開始執行。

RxPY - 例子

本章將詳細討論以下主題:

  • 展示observable、運算子和訂閱觀察者的基本示例。
  • observable和subject的區別。
  • 理解冷observable和熱observable。

下面是一個基本示例,展示了observable、運算子和訂閱觀察者的工作方式。

示例

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

這是一個非常簡單的例子,我從這個URL獲取使用者資料:

https://jsonplaceholder.typicode.com/users.

過濾資料,只顯示名稱以“C”開頭的使用者,然後使用map函式只返回名稱。以下是輸出:

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

observable和subject的區別

在這個例子中,我們將看到observable和subject的區別。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

輸出

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在上面的例子中,每次訂閱observable時,它都會給你新的值。

Subject示例

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

輸出

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果你看到,值在使用subject的兩個訂閱者之間共享。

理解冷observable和熱observable

observable被分類為

  • 冷observable
  • 熱observable

當多個訂閱者訂閱時,observable的區別就會顯現出來。

冷observable

冷observable是在每次訂閱時執行並渲染資料的observable。當它被訂閱時,observable被執行,並給出新的值。

下面的例子說明了冷observable。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

輸出

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在上面的例子中,每次訂閱observable時,它都會執行observable併發出值。正如上面例子所示,這些值在不同的訂閱者之間也可能不同。

熱observable

對於熱observable,它們會在準備好時發出值,並且不會總是等待訂閱。當值被髮出時,所有訂閱者都會收到相同的值。

當你想在observable準備好時發出值,或者你想將相同的值共享給所有訂閱者時,可以使用熱observable。

熱observable的例子是Subject和可連線運算子。

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

輸出

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果你看到,相同的值在訂閱者之間共享。你可以使用`publish()`可連線observable運算子來實現相同的功能。

廣告
© . All rights reserved.