RxPY - 連線運算子
publish
此方法會將可觀察物件轉換為可連線可觀察物件。
語法
publish(mapper=None)
引數
mapper:可選。用於多次多播源值但在不需要多次訂閱時的函式。
示例
from rx import create, range, operators as op
import random
def test_observable(observer, scheduler):
observer.on_next(random.random())
observer.on_completed()
source = create(test_observable).pipe(op.publish())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 –
{0}".format(i)))
source.connect()
輸出
E:\pyrx>python testrx.py From subscriber 1 - 0.14751607273318490 From subscriber 2 - 0.1475160727331849
ref_count
此運算子會使可觀察物件變成一個常規可觀察物件。
語法
ref_count()
示例
from rx import create, operators as op
import random
def test_observable(observer, scheduler):
observer.on_next(random.random())
source = create(test_observable).pipe(op.publish(),op.ref_count())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))
輸出
E:\pyrx>python testrx.py From subscriber 1 - 0.8230640432381131
replay
此方法的工作方式類似於 replaySubject。該方法會返回相同的值,即使可觀察物件已經發出,且某些訂閱者延遲訂閱。
語法
replay()
示例
from rx import create, range, operators as op
import random
from threading import Timer
def test_observable(observer, scheduler):
observer.on_next(random.random())
observer.on_completed()
source = create(test_observable).pipe(op.replay())
test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))
source.connect()
print("subscriber called after delay ")
def last_subscriber():
test3 = source.subscribe(on_next = lambda i: print("From subscriber 3 - {0}".format(i)))
t = Timer(5.0, last_subscriber)
t.start()
輸出
E:\pyrx>python testrx.py From subscriber 1 - 0.8340998157725388 From subscriber 2 - 0.8340998157725388 subscriber called after delay From subscriber 3 - 0.8340998157725388
廣告