
RxPY - 使用可觀察陣列
可觀察陣列是一個建立觀察者並將其附加到預期會有值來源的函式,例如,點選、DOM 元素的滑鼠事件等等。
本節具體講述以下內容。
建立可觀察陣列
訂閱並執行一個可觀察陣列
建立可觀察陣列
要建立一個可觀察陣列,我們將使用 create() 方法,並向其傳遞具有以下項的函式。
on_next() − 當可觀察陣列發出一個項時,將呼叫此函式。
on_completed() − 當可觀察陣列完成時,將呼叫此函式。
on_error() − 當可觀察陣列上發生錯誤時,將呼叫此函式。
若要使用 create() 方法,首先要按如下所示匯入該方法 −
from rx import create
以下是一個建立可觀察陣列的示例 −
testrx.py
from rx import create deftest_observable(observer, scheduler): observer.on_next("Hello") observer.on_error("Error") observer.on_completed() source = create(test_observable).
訂閱並執行一個可觀察陣列
若要訂閱一個可觀察陣列,我們需要使用 subscribe() 函式,並傳遞迴調函式 on_next、on_error 和 on_completed。
以下是一個示例 −
testrx.py
from rx import create deftest_observable(observer, scheduler): observer.on_next("Hello") observer.on_completed() source = create(test_observable) source.subscribe( on_next = lambda i: print("Got - {0}".format(i)), on_error = lambda e: print("Error : {0}".format(e)), on_completed = lambda: print("Job Done!"), )
subscribe() 方法負責執行可觀察陣列。回撥函式 on_next、on_error 和 on_completed 必須傳遞給 subscribe 方法。依次呼叫 subscribe 方法執行 test_observable() 函式。
不必向 subscribe() 方法傳遞所有三個回撥函式。你可以根據需要傳遞 on_next()、on_error() 和 on_completed()。
lambda 函式用於 on_next、on_error 和 on_completed。它將採用引數並執行給定表示式。
以下是建立的可觀察陣列的輸出 −
E:\pyrx>python testrx.py Got - Hello Job Done!
廣告