Python - 執行緒間通訊



執行緒間通訊是指在 Python 多執行緒程式中啟用執行緒之間通訊和同步的過程。

通常,Python 中的執行緒共享程序中的同一記憶體空間,這允許它們透過共享變數、物件和threading模組提供的專用同步機制來交換資料並協調其活動。

為了促進執行緒間通訊,threading 模組提供了各種同步原語,例如鎖、事件、條件和訊號量物件。在本教程中,您將學習如何使用 Event 和 Condition 物件在多執行緒程式中實現執行緒間的通訊。

Event 物件

Event 物件管理內部標誌的狀態,以便執行緒可以等待或設定。Event物件提供控制此標誌狀態的方法,允許執行緒基於共享條件同步其活動。

該標誌最初為假,使用 set() 方法設定為真,使用 clear() 方法重置為假。wait() 方法會阻塞,直到標誌為真。

以下是Event物件的關鍵方法:

  • is_set():當且僅當內部標誌為真時返回 True。
  • set():將內部標誌設定為真。所有等待它變為真的執行緒都會被喚醒。一旦標誌為真,呼叫 wait() 的執行緒根本不會阻塞。
  • clear():將內部標誌重置為假。隨後,呼叫 wait() 的執行緒將阻塞,直到呼叫 set() 將內部標誌再次設定為真。
  • wait(timeout=None):阻塞直到內部標誌為真。如果在進入時內部標誌為真,則立即返回。否則,阻塞直到另一個執行緒呼叫 set() 將標誌設定為真,或直到可選超時發生。當存在 timeout 引數且不為 None 時,它應該是一個浮點數,以秒為單位指定操作的超時時間。

示例

以下程式碼嘗試模擬由交通訊號燈狀態(綠色或紅色)控制的交通流量。

程式中有兩個執行緒,它們分別針對兩個不同的函式。signal_state() 函式定期設定和重置事件,表示訊號從綠色變為紅色。

traffic_flow() 函式等待事件被設定,並在事件保持設定狀態時執行迴圈。

from threading import Event, Thread
import time

terminate = False

def signal_state():
    global terminate
    while not terminate:
        time.sleep(0.5)
        print("Traffic Police Giving GREEN Signal")
        event.set()
        time.sleep(1)
        print("Traffic Police Giving RED Signal")
        event.clear()

def traffic_flow():
    global terminate
    num = 0
    while num < 10 and not terminate:
        print("Waiting for GREEN Signal")
        event.wait()
        print("GREEN Signal ... Traffic can move")
        while event.is_set() and not terminate:
            num += 1
            print("Vehicle No:", num," Crossing the Signal")
            time.sleep(1)
        print("RED Signal ... Traffic has to wait")

event = Event()
t1 = Thread(target=signal_state)
t2 = Thread(target=traffic_flow)
t1.start()
t2.start()

# Terminate the threads after some time
time.sleep(5)
terminate = True

# join all threads to complete
t1.join()
t2.join()

print("Exiting Main Thread")

輸出

執行上述程式碼後,您將獲得以下輸出:

Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 1  Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 2  Crossing the Signal
Vehicle No: 3  Crossing the Signal
Traffic Police Giving RED Signal
Traffic Police Giving GREEN Signal
Vehicle No: 4  Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Traffic Police Giving GREEN Signal
Traffic Police Giving RED Signal
Exiting Main Thread

Condition 物件

Python 的threading模組中的 Condition 物件提供了一種更高階的同步機制。它允許執行緒在繼續執行之前等待來自另一個執行緒的通知。Condition 物件始終與鎖相關聯,並提供執行緒間訊號機制。

以下是 threading.Condition() 類的語法:

threading.Condition(lock=None)

以下是 Condition 物件的關鍵方法:

  • acquire(*args):獲取底層鎖。此方法呼叫底層鎖上的相應方法;返回值是該方法返回的任何內容。
  • release():釋放底層鎖。此方法呼叫底層鎖上的相應方法;沒有返回值。
  • wait(timeout=None): 此方法釋放底層鎖,然後阻塞,直到另一個執行緒對同一個條件變數呼叫 notify() 或 notify_all() 方法喚醒它,或者直到可選的超時發生。一旦被喚醒或超時,它會重新獲取鎖並返回。
  • wait_for(predicate, timeout=None): 此實用程式方法可能會重複呼叫 wait(),直到滿足謂詞或發生超時。返回值是謂詞的最後返回值,如果方法超時則將評估為 False。
  • notify(n=1): 此方法最多喚醒 n 個等待條件變數的執行緒;如果沒有任何執行緒正在等待,則此方法什麼也不做。
  • notify_all(): 喚醒所有等待此條件的執行緒。此方法類似於 notify(),但它會喚醒所有等待執行緒,而不是一個執行緒。如果呼叫執行緒在呼叫此方法時未獲取鎖,則會引發 RuntimeError。

示例

此示例演示了使用 Python threading 模組的 Condition 物件進行執行緒間通訊的一種簡單形式。這裡 thread_athread_b 使用 Condition 物件進行通訊,thread_a 等待直到收到來自 thread_b 的通知。thread_b 休眠 2 秒後通知 thread_a,然後結束。

from threading import Condition, Thread
import time

c = Condition()

def thread_a():
    print("Thread A started")
    with c:
        print("Thread A waiting for permission...")
        c.wait()
        print("Thread A got permission!")
    print("Thread A finished")

def thread_b():
    print("Thread B started")
    with c:
        time.sleep(2)
        print("Notifying Thread A...")
        c.notify()
    print("Thread B finished")

Thread(target=thread_a).start()
Thread(target=thread_b).start()

輸出

執行上述程式碼後,您將獲得以下輸出:

Thread A started
Thread A waiting for permission...
Thread B started
Notifying Thread A...
Thread B finished
Thread A got permission!
Thread A finished

示例

這是另一個程式碼示例,演示瞭如何使用 Condition 物件線上程之間進行通訊。在這個例子中,執行緒 t2 執行 taskB() 函式,執行緒 t1 執行 taskA() 函式。t1 執行緒獲取條件併發出通知。

此時,t2 執行緒處於等待狀態。條件釋放後,等待執行緒繼續使用通知函式生成的隨機數。

from threading import Condition, Thread
import time
import random

numbers = []

def taskA(c):
    for _ in range(5):
        with c:
            num = random.randint(1, 10)
            print("Generated random number:", num)
            numbers.append(num)
            print("Notification issued")
            c.notify()
        time.sleep(0.3)

def taskB(c):
    for i in range(5):
        with c:
            print("waiting for update")
            while not numbers: 
                c.wait()
            print("Obtained random number", numbers.pop())
        time.sleep(0.3)

c = Condition()
t1 = Thread(target=taskB, args=(c,))
t2 = Thread(target=taskA, args=(c,))
t1.start()
t2.start()
t1.join()
t2.join()
print("Done")

執行此程式碼時,將產生以下輸出

waiting for update
Generated random number: 2
Notification issued
Obtained random number 2
Generated random number: 5
Notification issued
waiting for update
Obtained random number 5
Generated random number: 1
Notification issued
waiting for update
Obtained random number 1
Generated random number: 9
Notification issued
waiting for update
Obtained random number 9
Generated random number: 2
Notification issued
waiting for update
Obtained random number 2
Done
廣告