
響應式程式設計
響應式程式設計是一種處理資料流和變化傳播的程式設計正規化。這意味著當一個元件發出資料流時,響應式程式設計庫會將變化傳播到其他元件。變化的傳播將持續進行,直到到達最終接收者。事件驅動程式設計和響應式程式設計的區別在於,事件驅動程式設計圍繞事件展開,而響應式程式設計圍繞資料展開。
用於響應式程式設計的 ReactiveX 或 RX
ReactiveX 或 Reactive Extension 是響應式程式設計最著名的實現。ReactiveX 的工作依賴於以下兩個類:
Observable 類
此類是資料流或事件的來源,它打包傳入資料,以便資料可以從一個執行緒傳遞到另一個執行緒。在某些觀察者訂閱它之前,它不會提供資料。
Observer 類
此類使用Observable發出的資料流。可以有多個觀察者與 Observable 相關聯,每個觀察者都會收到發出的每個資料項。觀察者可以透過訂閱 Observable 來接收三種類型的事件:
on_next() 事件 - 表示資料流中存在一個元素。
on_completed() 事件 - 表示發射結束,不再有專案到來。
on_error() 事件 - 也表示發射結束,但在Observable丟擲錯誤的情況下。
RxPY – Python 響應式程式設計模組
RxPY 是一個可用於響應式程式設計的 Python 模組。我們需要確保該模組已安裝。可以使用以下命令安裝 RxPY 模組:
pip install RxPY
示例
以下是一個 Python 指令碼,它使用RxPY模組及其Observable和Observe for類進行響應式程式設計。基本上有兩個類:
get_strings() - 用於從觀察者獲取字串。
PrintObserver() - 用於列印觀察者中的字串。它使用觀察者類的所有三個事件。它還使用 subscribe() 類。
from rx import Observable, Observer def get_strings(observer): observer.on_next("Ram") observer.on_next("Mohan") observer.on_next("Shyam") observer.on_completed() class PrintObserver(Observer): def on_next(self, value): print("Received {0}".format(value)) def on_completed(self): print("Finished") def on_error(self, error): print("Error: {0}".format(error)) source = Observable.create(get_strings) source.subscribe(PrintObserver())
輸出
Received Ram Received Mohan Received Shyam Finished
用於響應式程式設計的 PyFunctional 庫
PyFunctional是另一個可用於響應式程式設計的 Python 庫。它使我們能夠使用 Python 程式語言建立函式式程式。它很有用,因為它允許我們透過使用連結的函式運算子來建立資料管道。
RxPY 和 PyFunctional 之間的區別
這兩個庫都用於響應式程式設計,並以類似的方式處理流,但兩者之間主要的區別取決於資料的處理方式。RxPY處理系統中的資料和事件,而PyFunctional專注於使用函數語言程式設計正規化轉換資料。
安裝 PyFunctional 模組
在使用此模組之前,我們需要安裝它。可以使用 pip 命令安裝它,如下所示:
pip install pyfunctional
示例
以下示例使用PyFunctional模組及其seq類,該類充當流物件,我們可以使用它進行迭代和操作。在此程式中,它使用 lambda 函式將序列對映為每個值的雙倍,然後過濾 x 大於 4 的值,最後將序列減少為所有剩餘值的總和。
from functional import seq result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y) print ("Result: {}".format(result))
輸出
Result: 6