RxPy - 示例



在本章中,我們將詳細討論以下主題:

  • 展示 Observable、運算子和訂閱 Observer 工作的基本示例。
  • Observable 和 Subject 之間的區別。
  • 瞭解冷 Observable 和熱 Observable。

下面是一個基本示例,展示了 Observable、運算子和訂閱 Observer 的工作原理。

示例

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

這是一個非常簡單的示例,我從以下 URL 獲取使用者資料:

https://jsonplaceholder.typicode.com/users。

過濾資料,以顯示名稱以“C”開頭的使用者,然後使用 map 函式僅返回名稱。以下是輸出:

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

Observable 和 Subject 之間的區別

在本示例中,我們將瞭解 Observable 和 Subject 之間的區別。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

輸出

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在上面的示例中,每次訂閱 Observable 時,它都會為您提供新的值。

Subject 示例

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

輸出

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果您看到這些值是透過 Subject 共享給兩個訂閱者的。

瞭解冷 Observable 和熱 Observable

Observable 可分為

  • 冷 Observable
  • 熱 Observable

當多個訂閱者訂閱時,會注意到 Observable 的差異。

冷 Observable

冷 Observable 是一種每次訂閱時都會執行並生成資料的 Observable。當它被訂閱時,Observable 會執行並提供新的值。

以下示例說明了冷 Observable 的理解。

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

輸出

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

在上面的示例中,每次訂閱 Observable 時,它都會執行 Observable 併發出值。如上例所示,這些值在不同的訂閱者之間也可能有所不同。

熱 Observable

對於熱 Observable,它們將在準備好時發出值,並且不會總是等待訂閱。當值發出時,所有訂閱者都將獲得相同的值。

當您希望在 Observable 準備好時發出值,或希望將相同的值共享給所有訂閱者時,可以使用熱 Observable。

熱 Observable 的示例包括 Subject 和可連線運算子。

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

輸出

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

如果您看到,相同的值在訂閱者之間共享。您可以使用 publish() 可連線 Observable 運算子來實現相同的效果。

廣告