
RxPY - 使用排程器實現併發
RxPY 的一個重要特性是併發性,即允許任務並行執行。為此,我們有兩個運算子 `subscribe_on()` 和 `observe_on()`,它們與排程器一起工作,排程器將決定訂閱任務的執行。
這是一個展示 `subscribe_on()`、`observe_on()` 和排程器需求的工作示例。
示例
import random import time import rx from rx import operators as ops def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) input("Press any key to exit\n")
在上面的示例中,我有兩個任務:任務 1 和任務 2。任務的執行是順序的。只有在第一個任務完成後,第二個任務才會開始。
輸出
E:\pyrx>python testrx.py From Task 1: 1 From Task 1: 2 From Task 1: 3 From Task 1: 4 From Task 1: 5 Task 1 complete From Task 2: 1 From Task 2: 2 From Task 2: 3 From Task 2: 4 Task 2 complete
RxPY 支援許多排程器,這裡我們將使用 `ThreadPoolScheduler`。`ThreadPoolScheduler` 主要嘗試管理可用的 CPU 執行緒。
在前面看到的示例中,我們將使用一個多處理模組,該模組將提供 `cpu_count`。該計數將提供給 `ThreadPoolScheduler`,它將根據可用的執行緒來管理並行執行任務。
這是一個工作示例:
import multiprocessing import random import time from threading import current_thread import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as ops # calculate cpu count, using which will create a ThreadPoolScheduler thread_count = multiprocessing.cpu_count() thread_pool_scheduler = ThreadPoolScheduler(thread_count) print("Cpu count is : {0}".format(thread_count)) def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) input("Press any key to exit\n")
在上面的示例中,我有 2 個任務,`cpu_count` 為 4。由於任務數為 2,而我們可用的執行緒數為 4,因此兩個任務可以並行啟動。
輸出
E:\pyrx>python testrx.py Cpu count is : 4 Press any key to exit From Task 1: 1 From Task 2: 1 From Task 1: 2 From Task 2: 2 From Task 2: 3 From Task 1: 3 From Task 2: 4 Task 2 complete From Task 1: 4 From Task 1: 5 Task 1 complete
如果檢視輸出,則兩個任務已並行啟動。
現在,考慮一個任務數超過 CPU 核心數的情況,例如 CPU 核心數為 4,任務數為 5。在這種情況下,我們需要檢查是否有任何執行緒在任務完成後空閒,以便可以將其分配給佇列中可用的新任務。
為此,我們可以使用 `observe_on()` 運算子,它將觀察排程器是否有任何空閒執行緒。這是一個使用 `observe_on()` 的工作示例。
示例
import multiprocessing import random import time from threading import current_thread import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as ops # calculate cpu count, using which will create a ThreadPoolScheduler thread_count = multiprocessing.cpu_count() thread_pool_scheduler = ThreadPoolScheduler(thread_count) print("Cpu count is : {0}".format(thread_count)) def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) #Task 3 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 3: {0}".format(s)), lambda e: print(e), lambda: print("Task 3 complete") ) #Task 4 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 4: {0}".format(s)), lambda e: print(e), lambda: print("Task 4 complete") ) #Task 5 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.observe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 5: {0}".format(s)), lambda e: print(e), lambda: print("Task 5 complete") ) input("Press any key to exit\n")
輸出
E:\pyrx>python testrx.py Cpu count is : 4 From Task 4: 1 From Task 4: 2 From Task 1: 1 From Task 2: 1 From Task 3: 1 From Task 1: 2 From Task 3: 2 From Task 4: 3 From Task 3: 3 From Task 2: 2 From Task 1: 3 From Task 4: 4 Task 4 complete From Task 5: 1 From Task 5: 2 From Task 5: 3 From Task 3: 4 Task 3 complete From Task 2: 3 Press any key to exit From Task 5: 4 Task 5 complete From Task 1: 4 From Task 2: 4 Task 2 complete From Task 1: 5 Task 1 complete
如果檢視輸出,任務 4 完成的那一刻,執行緒將被分配給下一個任務,即任務 5,並且該任務開始執行。
廣告