RxPY - 使用 Subject



Subject 既是一個可觀察序列,也是一個可以多播(即與已訂閱的多個觀察者通訊)的觀察者。

我們將討論以下關於 Subject 的主題:

  • 建立 Subject
  • 訂閱 Subject
  • 向 Subject 傳遞資料
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

建立 Subject

要使用 Subject,我們需要匯入 Subject,如下所示:

from rx.subject import Subject

您可以如下建立 Subject 物件:

subject_test = Subject()

該物件是一個觀察者,具有三種方法:

  • on_next(value)
  • on_error(error) 和
  • on_completed()

訂閱 Subject

您可以在 Subject 上建立多個訂閱,如下所示:

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

向 Subject 傳遞資料

您可以使用 on_next(value) 方法向建立的 Subject 傳遞資料,如下所示:

subject_test.on_next("A")
subject_test.on_next("B")

資料將傳遞給新增到 Subject 上的所有訂閱。

這是一個 Subject 的工作示例。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

subject_test 物件透過呼叫 Subject() 建立。subject_test 物件引用了 on_next(value)、on_error(error) 和 on_completed() 方法。以上示例的輸出如下所示:

輸出

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

我們可以使用 on_completed() 方法停止 Subject 執行,如下所示。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

一旦我們呼叫 complete,稍後呼叫的 next 方法將不會被呼叫。

輸出

E:\pyrx>python testrx.py
The value is A
The value is A

現在讓我們看看如何呼叫 on_error(error) 方法。

示例

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

輸出

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject 在被呼叫時會提供最新的值。您可以如下建立 BehaviorSubject:

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

這是一個使用 BehaviorSubject 的工作示例

示例

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

輸出

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Replay Subject

ReplaySubject 類似於 BehaviorSubject,它可以緩衝值並將這些值重播給新的訂閱者。這是一個 ReplaySubject 的工作示例。

示例

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

ReplaySubject 上使用的緩衝區值為 2。因此,最後兩個值將被緩衝並用於呼叫的新訂閱者。

輸出

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

在 AsyncSubject 的情況下,最後一個呼叫的值將傳遞給訂閱者,並且只有在呼叫 complete() 方法後才會執行此操作。

示例

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

輸出

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2
廣告