RxPY - 建立 Observable
建立
此方法用於建立 Observable。它將包含 observer 方法,即:
on_next() - 當 Observable 發射一個專案時,此函式會被呼叫。
on_completed() - 當 Observable 完成時,此函式會被呼叫。
on_error() - 當 Observable 發生錯誤時,此函式會被呼叫。
下面是一個工作示例:
testrx.py
from rx import create
def test_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error occured")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
下面是建立的 Observable 的輸出:
E:\pyrx>python testrx.py Got - Hello Job Done!
空
此 Observable 不會輸出任何內容,並直接發射完成狀態。
語法
empty()
返回值
它將返回一個不包含任何元素的 Observable。
示例
from rx import empty
test = empty()
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
輸出
E:\pyrx>python testrx.py Job Done!
永不完成
此方法建立一個永遠不會到達完成狀態的 Observable。
語法
never()
返回值
它將返回一個永遠不會完成的 Observable。
示例
from rx import never
test = never()
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
輸出
It does not show any output.
丟擲異常
此方法將建立一個會丟擲錯誤的 Observable。
語法
throw(exception)
引數
exception:包含錯誤詳細資訊的物件。
返回值
返回一個包含錯誤詳細資訊的 Observable。
示例
from rx import throw
test = throw(Exception('There is an Error!'))
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
輸出
E:\pyrx>python testrx.py Error: There is an Error!
從迭代器建立
此方法將給定的陣列或物件轉換為 Observable。
語法
from_(iterator)
引數
iterator:物件或陣列。
返回值
這將為給定的迭代器返回一個 Observable。
示例
from rx import from_
test = from_([1,2,3,4,5,6,7,8,9,10])
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
輸出
E:\pyrx>python testrx.py The value is 1 The value is 2 The value is 3 The value is 4 The value is 5 The value is 6 The value is 7 The value is 8 The value is 9 The value is 10 Job Done!
間隔
此方法將在超時後產生一系列值。
語法
interval(period)
引數
period:啟動整數序列。
返回值
它返回一個包含所有按順序排列的值的 Observable。
示例
import rx
from rx import operators as ops
rx.interval(1).pipe(
ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")
輸出
E:\pyrx>python testrx.py Press any key to exit The value is 0 The value is 1 The value is 4 The value is 9 The value is 16 The value is 25 The value is 36 The value is 49 The value is 64 The value is 81 The value is 100 The value is 121 The value is 144 The value is 169 The value is 196 The value is 225 The value is 256 The value is 289 The value is 324 The value is 361 The value is 400
僅值
此方法將給定值轉換為 Observable。
語法
just(value)
引數
value:要轉換為 Observable 的值。
返回值
它將返回一個包含給定值的 Observable。
示例
from rx import just
test = just([15, 25,50, 55])
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
輸出
E:\pyrx>python testrx.py The value is [15, 25, 50, 55] Job Done!
範圍
此方法根據給定的輸入提供一系列整數。
語法
range(start, stop=None)
引數
start:範圍開始的第一個值。
stop:可選,範圍停止的最後一個值。
返回值
這將返回一個根據給定輸入包含整數值的 Observable。
示例
from rx import range
test = range(0,10)
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
輸出
E:\pyrx>python testrx.py The value is 0 The value is 1 The value is 2 The value is 3 The value is 4 The value is 5 The value is 6 The value is 7 The value is 8 The value is 9 Job Done!
重複值
此方法將建立一個 Observable,該 Observable 將根據給定的次數重複給定值。
語法
repeat_value(value=None, repeat_count=None)
引數
value:可選。要重複的值。
repeat_count:可選。要重複給定值的次數。
返回值
它將返回一個 Observable,該 Observable 將根據給定的次數重複給定值。
示例
from rx import repeat_value
test = repeat_value(44,10)
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
輸出
E:\pyrx>python testrx.py The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 The value is 44 Job Done!
啟動
此方法以函式作為輸入,並返回一個 Observable,該 Observable 將返回輸入函式中的值。
語法
start(func)
引數
func:將被呼叫的函式。
返回值
它返回一個 Observable,該 Observable 將包含輸入函式的返回值。
示例
from rx import start
test = start(lambda : "Hello World")
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
輸出
E:\pyrx>python testrx.py The value is Hello World Job Done!
計時器
此方法將在超時完成後按順序發射值。
語法
timer(duetime)
引數
duetime:發射第一個值之後的時間。
返回值
它將返回一個在 duetime 之後發射值的 Observable。
示例
import rx
from rx import operators as ops
rx.timer(5.0, 10).pipe(
ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")
輸出
E:\pyrx>python testrx.py Press any key to exit The value is 0 The value is 1 The value is 4 The value is 9 The value is 16 The value is 25 The value is 36 The value is 49 The value is 64