RxPY - 使用可觀察陣列



可觀察陣列是一個建立觀察者並將其附加到預期會有值來源的函式,例如,點選、DOM 元素的滑鼠事件等等。

本節具體講述以下內容。

  • 建立可觀察陣列

  • 訂閱並執行一個可觀察陣列

建立可觀察陣列

要建立一個可觀察陣列,我們將使用 create() 方法,並向其傳遞具有以下項的函式。

  • on_next() − 當可觀察陣列發出一個項時,將呼叫此函式。

  • on_completed() − 當可觀察陣列完成時,將呼叫此函式。

  • on_error() − 當可觀察陣列上發生錯誤時,將呼叫此函式。

若要使用 create() 方法,首先要按如下所示匯入該方法 −

from rx import create

以下是一個建立可觀察陣列的示例 −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

訂閱並執行一個可觀察陣列

若要訂閱一個可觀察陣列,我們需要使用 subscribe() 函式,並傳遞迴調函式 on_next、on_error 和 on_completed。

以下是一個示例 −

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

subscribe() 方法負責執行可觀察陣列。回撥函式 on_nexton_erroron_completed 必須傳遞給 subscribe 方法。依次呼叫 subscribe 方法執行 test_observable() 函式。

不必向 subscribe() 方法傳遞所有三個回撥函式。你可以根據需要傳遞 on_next()、on_error() 和 on_completed()。

lambda 函式用於 on_next、on_error 和 on_completed。它將採用引數並執行給定表示式。

以下是建立的可觀察陣列的輸出 −

E:\pyrx>python testrx.py
Got - Hello
Job Done!
廣告