響應式程式設計



響應式程式設計是一種處理資料流和變化傳播的程式設計正規化。這意味著當一個元件發出資料流時,響應式程式設計庫會將變化傳播到其他元件。變化的傳播將持續進行,直到到達最終接收者。事件驅動程式設計和響應式程式設計的區別在於,事件驅動程式設計圍繞事件展開,而響應式程式設計圍繞資料展開。

用於響應式程式設計的 ReactiveX 或 RX

ReactiveX 或 Reactive Extension 是響應式程式設計最著名的實現。ReactiveX 的工作依賴於以下兩個類:

Observable 類

此類是資料流或事件的來源,它打包傳入資料,以便資料可以從一個執行緒傳遞到另一個執行緒。在某些觀察者訂閱它之前,它不會提供資料。

Observer 類

此類使用Observable發出的資料流。可以有多個觀察者與 Observable 相關聯,每個觀察者都會收到發出的每個資料項。觀察者可以透過訂閱 Observable 來接收三種類型的事件:

  • on_next() 事件 - 表示資料流中存在一個元素。

  • on_completed() 事件 - 表示發射結束,不再有專案到來。

  • on_error() 事件 - 也表示發射結束,但在Observable丟擲錯誤的情況下。

RxPY – Python 響應式程式設計模組

RxPY 是一個可用於響應式程式設計的 Python 模組。我們需要確保該模組已安裝。可以使用以下命令安裝 RxPY 模組:

pip install RxPY

示例

以下是一個 Python 指令碼,它使用RxPY模組及其ObservableObserve for類進行響應式程式設計。基本上有兩個類:

  • get_strings() - 用於從觀察者獲取字串。

  • PrintObserver() - 用於列印觀察者中的字串。它使用觀察者類的所有三個事件。它還使用 subscribe() 類。

from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

輸出

Received Ram
Received Mohan
Received Shyam
Finished

用於響應式程式設計的 PyFunctional 庫

PyFunctional是另一個可用於響應式程式設計的 Python 庫。它使我們能夠使用 Python 程式語言建立函式式程式。它很有用,因為它允許我們透過使用連結的函式運算子來建立資料管道。

RxPY 和 PyFunctional 之間的區別

這兩個庫都用於響應式程式設計,並以類似的方式處理流,但兩者之間主要的區別取決於資料的處理方式。RxPY處理系統中的資料和事件,而PyFunctional專注於使用函數語言程式設計正規化轉換資料。

安裝 PyFunctional 模組

在使用此模組之前,我們需要安裝它。可以使用 pip 命令安裝它,如下所示:

pip install pyfunctional

示例

以下示例使用PyFunctional模組及其seq類,該類充當流物件,我們可以使用它進行迭代和操作。在此程式中,它使用 lambda 函式將序列對映為每個值的雙倍,然後過濾 x 大於 4 的值,最後將序列減少為所有剩餘值的總和。

from functional import seq

result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)

print ("Result: {}".format(result))

輸出

Result: 6
廣告