執行緒間通訊



在現實生活中,如果一個團隊的人在做一項共同的任務,那麼他們之間就應該進行溝通才能正確完成任務。同樣的比喻也適用於執行緒。在程式設計中,為了減少處理器的空閒時間,我們建立多個執行緒並將不同的子任務分配給每個執行緒。因此,必須有一個通訊機制,並且它們應該相互互動以同步地完成工作。

請考慮以下與執行緒間通訊相關的要點:

  • 沒有效能提升 - 如果我們無法實現執行緒和程序之間的正確通訊,那麼併發和並行帶來的效能提升就毫無用處。

  • 正確完成任務 - 如果沒有執行緒之間的適當的互通機制,則無法正確完成分配的任務。

  • 比程序間通訊更高效 - 執行緒間通訊比程序間通訊更高效、更容易使用,因為同一個程序中的所有執行緒共享相同的地址空間,不需要使用共享記憶體。

Python 用於執行緒安全通訊的資料結構

多執行緒程式碼帶來了一個問題:如何在不同的執行緒之間傳遞資訊。標準的通訊原語無法解決這個問題。因此,我們需要實現我們自己的複合物件,以便線上程之間共享物件以使通訊執行緒安全。以下是一些經過修改後提供執行緒安全通訊的資料結構:

集合 (Sets)

為了以執行緒安全的方式使用集合資料結構,我們需要擴充套件集合類來實現我們自己的鎖機制。

示例

這是一個擴充套件 Python 集合類的示例:

class extend_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(extend_class, self).__init__(*args, **kwargs)

   def add(self, elem):
      self._lock.acquire()
	  try:
      super(extend_class, self).add(elem)
      finally:
      self._lock.release()
  
   def delete(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).delete(elem)
      finally:
      self._lock.release()

在上面的示例中,定義了一個名為 **extend_class** 的類物件,它進一步繼承自 Python 的 **set 類**。一個鎖物件在該類的建構函式中建立。現在,有兩個函式 - **add()** 和 **delete()**。這些函式被定義並且是執行緒安全的。它們都依賴於 **super** 類的功能,但有一個關鍵例外。

裝飾器 (Decorator)

這是另一個用於執行緒安全通訊的關鍵方法,即使用裝飾器。

示例

考慮一個 Python 示例,它展示瞭如何使用裝飾器:

def lock_decorator(method):

   def new_deco_method(self, *args, **kwargs):
      with self._lock:
         return method(self, *args, **kwargs)
return new_deco_method

class Decorator_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(Decorator_class, self).__init__(*args, **kwargs)

   @lock_decorator
   def add(self, *args, **kwargs):
      return super(Decorator_class, self).add(elem)
   @lock_decorator
   def delete(self, *args, **kwargs):
      return super(Decorator_class, self).delete(elem)

在上面的示例中,定義了一個名為 lock_decorator 的裝飾器方法,它進一步繼承自 Python 的方法類。然後,一個鎖物件在該類的建構函式中建立。現在,有兩個函式 - add() 和 delete()。這些函式被定義並且是執行緒安全的。它們都依賴於父類的功能,但有一個關鍵例外。

列表 (Lists)

列表資料結構是執行緒安全的,對於臨時的記憶體儲存來說既快速又容易。在 CPython 中,GIL(全域性直譯器鎖)保護了對它們的併發訪問。我們知道列表是執行緒安全的,但它們包含的資料呢?實際上,列表的資料並沒有受到保護。例如,**L.append(x)** 並不能保證在另一個執行緒嘗試做同樣的事情時返回預期結果。這是因為,雖然 **append()** 是一個原子操作並且是執行緒安全的,但是另一個執行緒正在嘗試併發地修改列表的資料,因此我們可以在輸出中看到競爭條件的副作用。

為了解決這類問題並安全地修改資料,我們必須實現一個適當的鎖機制,這可以進一步確保多個執行緒不會出現競爭條件。為了實現適當的鎖機制,我們可以像在前面的示例中那樣擴充套件類。

列表上的一些其他原子操作如下:

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

此處:

  • L、L1、L2 都是列表
  • D、D1、D2 是字典
  • x、y 是物件
  • i、j 是整數

佇列 (Queues)

如果列表的資料沒有受到保護,我們可能會面臨後果。我們可能會獲取或刪除錯誤的資料項,或者出現競爭條件。這就是為什麼建議使用佇列資料結構的原因。佇列的一個現實世界例子可以是一條單車道單行道,先進入的車輛先離開。在售票視窗和公交車站也可以看到更多現實世界的例子。

Queues

佇列預設是執行緒安全的資料結構,我們不需要擔心實現複雜的鎖機制。Python 提供了模組來在我們的應用程式中使用不同型別的佇列。

佇列型別

在本節中,我們將學習不同型別的佇列。Python 提供了三個從 **** 模組中使用的佇列選項:

  • 普通佇列 (FIFO,先進先出)
  • LIFO,後進先出
  • 優先順序佇列

我們將在後續章節中學習不同的佇列。

普通佇列 (FIFO,先進先出)

這是 Python 提供的最常用的佇列實現。在這種排隊機制中,先到者先服務。FIFO 也稱為普通佇列。FIFO 佇列可以表示如下:

FIFO

Python FIFO 佇列實現

在 Python 中,FIFO 佇列可以用單執行緒和多執行緒實現。

單執行緒 FIFO 佇列

為了用單執行緒實現 FIFO 佇列,**Queue** 類將實現一個基本的先進先出的容器。使用 **put()** 將元素新增到序列的“一端”,使用 **get()** 從另一端刪除元素。

示例

以下是使用單執行緒實現 FIFO 佇列的 Python 程式:

import queue

q = queue.Queue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end = " ")

輸出

item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7

輸出顯示上述程式使用單執行緒來說明元素從佇列中移除的順序與插入順序相同。

多執行緒 FIFO 佇列

為了用多執行緒實現 FIFO,我們需要定義 myqueue() 函式,該函式從 queue 模組擴充套件而來。get() 和 put() 方法的工作原理與上面討論的單執行緒 FIFO 佇列實現中的相同。然後,為了使其成為多執行緒的,我們需要宣告和例項化執行緒。這些執行緒將以 FIFO 方式使用佇列。

示例

以下是使用多執行緒實現 FIFO 佇列的 Python 程式:

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
   item = queue.get()
   if item is None:
   break
   print("{} removed {} from the queue".format(threading.current_thread(), item))
   queue.task_done()
   time.sleep(2)
q = queue.Queue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

輸出

<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue

LIFO,後進先出佇列

該佇列與 FIFO(先進先出)佇列使用了完全相反的類比。在這種排隊機制中,最後來的人將先得到服務。這類似於實現堆疊資料結構。LIFO 佇列在實現人工智慧的深度優先搜尋等演算法時非常有用。

Python LIFO 佇列實現

在 Python 中,LIFO 佇列可以用單執行緒和多執行緒實現。

單執行緒 LIFO 佇列

為了用單執行緒實現 LIFO 佇列,**Queue** 類將使用結構 **Queue.LifoQueue** 實現一個基本的最後進入,先離開的容器。現在,呼叫 **put()** 時,元素新增到容器的頭部,使用 **get()** 時也從頭部移除。

示例

以下是使用單執行緒實現 LIFO 佇列的 Python 程式:

import queue

q = queue.LifoQueue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end=" ")
Output:
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0

輸出顯示上述程式使用單執行緒來說明元素從佇列中移除的順序與插入順序相反。

多執行緒 LIFO 佇列

實現方式與我們使用多執行緒實現 FIFO 佇列的方式類似。唯一的區別是我們需要使用 **Queue** 類,它將使用結構 **Queue.LifoQueue** 實現一個基本的最後進入,先離開的容器。

示例

以下是使用多執行緒實現 LIFO 佇列的 Python 程式:

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
	  print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join() 

輸出

<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue

優先順序佇列

在 FIFO 和 LIFO 佇列中,專案的順序與插入順序有關。但是,在許多情況下,優先順序比插入順序更重要。讓我們考慮一個現實世界的例子。假設機場安檢正在檢查不同類別的乘客。VVIP、航空公司工作人員、海關官員等類別的乘客可能會優先檢查,而不是像普通乘客那樣根據到達順序進行檢查。

需要考慮的另一個重要方面是:如何開發任務排程器。一種常見的方案是根據優先順序順序服務佇列中最緊急的任務。可以使用此資料結構根據專案的優先順序值從佇列中選取專案。

Python 優先順序佇列實現

在 Python 中,優先順序佇列可以用單執行緒和多執行緒實現。

單執行緒優先順序佇列

為了使用單執行緒實現優先順序佇列,**Queue** 類將使用結構 **Queue.PriorityQueue** 實現一個基於優先順序的任務容器。現在,呼叫 **put()** 時,元素將新增一個值,其中值越小,優先順序越高,因此使用 **get()** 時將首先檢索。

示例

考慮以下使用單執行緒實現優先順序佇列的 Python 程式:

import queue as Q
p_queue = Q.PriorityQueue()

p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))

while not p_queue.empty():
   item = p_queue.get()
   print('%s - %s' % item)

輸出

1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important

在上面的輸出中,我們可以看到佇列已根據優先順序儲存專案——值越小,優先順序越高。

多執行緒優先順序佇列

實現方式與使用多執行緒實現 FIFO 和 LIFO 佇列的方式類似。唯一的區別是我們需要使用 **Queue** 類來使用結構 **Queue.PriorityQueue** 初始化優先順序。另一個區別在於佇列的生成方式。在下面給出的示例中,它將使用兩個相同的資料集生成。

示例

以下 Python 程式有助於使用多執行緒實現優先順序佇列:

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
   q.put(i,1)

for i in range(5):
   q.put(i,1)

threads = []
for i in range(2):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

輸出

<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue
廣告