Python - 執行緒池



執行緒池是一種自動高效管理多個執行緒的機制,允許併發執行任務。Python 沒有透過threading模組直接提供執行緒池。

相反,它透過multiprocessing.dummy模組和concurrent.futures模組提供基於執行緒的池化。這些模組提供了方便的介面來建立和管理執行緒池,使併發任務執行更容易。

什麼是執行緒池?

執行緒池是執行緒的集合,由一個池管理。池中的每個執行緒稱為工作執行緒或工作者執行緒。這些執行緒可以重複使用來執行多個任務,從而減少了重複建立和銷燬執行緒的負擔。

執行緒池控制執行緒的建立及其生命週期,使其能夠更有效地處理大量任務。

我們可以使用以下類在 Python 中實現執行緒池:

  • Python ThreadPool 類
  • Python ThreadPoolExecutor 類

使用 Python ThreadPool 類

multiprocessing.pool.ThreadPool類在multiprocessing模組中提供了一個執行緒池介面。它管理一個工作執行緒池,可以將作業提交到該池以進行併發執行。

ThreadPool物件透過處理工作執行緒之間的任務建立和分配來簡化多個執行緒的管理。它與最初為程序設計的Pool類共享一個介面,但已調整為也能與執行緒一起使用。

ThreadPool例項與Pool例項完全介面相容,應作為上下文管理器進行管理,或者手動呼叫close()和terminate()。

示例

此示例演示了使用Python執行緒池對數字列表並行執行square和cube函式,其中每個函式併發應用於數字,最多使用3個執行緒,每個執行緒在執行之間延遲1秒。

from multiprocessing.dummy import Pool as ThreadPool
import time

def square(number):
   sqr = number * number
   time.sleep(1)
   print("Number:{} Square:{}".format(number, sqr))

def cube(number):
   cub = number*number*number
   time.sleep(1)
   print("Number:{} Cube:{}".format(number, cub))

numbers = [1, 2, 3, 4, 5]
pool = ThreadPool(3)
pool.map(square, numbers)
pool.map(cube, numbers)

pool.close()

輸出

執行上述程式碼後,您將獲得以下輸出:

Number:2 Square:4
Number:1 Square:1
Number:3 Square:9
Number:4 Square:16
Number:5 Square:25
Number:1 Cube:1
Number:2 Cube:8
Number:3 Cube:27
Number:4 Cube:64
Number:5 Cube:125

使用 Python ThreadPoolExecutor 類

Pythonconcurrent.futures模組的ThreadPoolExecutor類提供了一個高階介面,用於使用執行緒非同步執行函式。concurrent.futures模組包括Future類和兩個Executor類:ThreadPoolExecutorProcessPoolExecutor

Future 類

concurrent.futures.Future類負責處理任何可呼叫物件(例如函式)的非同步執行。要獲取Future物件,您應該在任何Executor物件上呼叫submit()方法。不應透過其建構函式直接建立它。

Future類中的重要方法有:

  • result(timeout=None):此方法返回呼叫返回的值。如果呼叫尚未完成,則此方法將等待最多timeout秒。如果呼叫在timeout秒內未完成,則將引發TimeoutError。如果未指定timeout,則等待時間沒有限制。
  • cancel():此方法嘗試取消呼叫。如果呼叫當前正在執行或已完成執行且無法取消,則該方法將返回布林值False。否則,呼叫將被取消,該方法返回True。
  • cancelled():如果呼叫已成功取消,則返回True。
  • running():如果呼叫當前正在執行且無法取消,則返回True。
  • done():如果呼叫已成功取消或已完成執行,則返回True。

ThreadPoolExecutor 類

此類表示一個指定最大工作執行緒數的執行緒池,用於非同步執行呼叫。

concurrent.futures.ThreadPoolExecutor(max_threads)

示例

這是一個使用 concurrent.futures.ThreadPoolExecutor 類在 Python 中非同步管理和執行任務的示例。具體來說,它展示瞭如何將多個任務提交到執行緒池以及如何檢查它們的執行狀態。

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def square(numbers):
   for val in numbers:
      ret = val*val
      sleep(1)
      print("Number:{} Square:{}".format(val, ret))
def cube(numbers):
   for val in numbers:
      ret = val*val*val
      sleep(1)
      print("Number:{} Cube:{}".format(val, ret))
if __name__ == '__main__':
   numbers = [1,2,3,4,5]
   executor = ThreadPoolExecutor(4)
   thread1 = executor.submit(square, (numbers))
   thread2 = executor.submit(cube, (numbers))
   print("Thread 1 executed ? :",thread1.done())
   print("Thread 2 executed ? :",thread2.done())
   sleep(2)
   print("Thread 1 executed ? :",thread1.done())
   print("Thread 2 executed ? :",thread2.done())

它將產生以下輸出 -

Thread 1 executed ? : False
Thread 2 executed ? : False
Number:1 Square:1
Number:1 Cube:1
Thread 1 executed ? : False
Thread 2 executed ? : False
Number:2 Square:4
Number:2 Cube:8
Number:3 Square:9
Number:3 Cube:27
Number:4 Square:16
Number:4 Cube:64
Number:5 Square:25
Number:5 Cube:125
廣告