Ruby - 多執行緒



傳統的程式只有一個執行執行緒,程式包含的語句或指令按順序執行,直到程式終止。

多執行緒程式具有多個執行執行緒。在每個執行緒中,語句按順序執行,但執行緒本身可以在多核 CPU 上並行執行,例如。通常在單 CPU 機器上,多個執行緒並非真正並行執行,而是透過交錯執行執行緒來模擬並行性。

Ruby 使用Thread類簡化了多執行緒程式的編寫。Ruby 執行緒是一種輕量級且高效的方式,可在程式碼中實現併發。

建立 Ruby 執行緒

要啟動一個新執行緒,只需將一個程式碼塊與Thread.new呼叫關聯。將建立一個新執行緒來執行程式碼塊中的程式碼,而原始執行緒將立即從Thread.new返回並繼續執行下一條語句。

# Thread #1 is running here
Thread.new {
   # Thread #2 runs this code
}
# Thread #1 runs this code

示例

這是一個示例,展示瞭如何使用多執行緒 Ruby 程式。

#!/usr/bin/ruby

def func1
   i = 0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i = i+1
   end
end

def func2
   j = 0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j = j+1
   end
end

puts "Started At #{Time.now}"
t1 = Thread.new{func1()}
t2 = Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"

這將產生以下結果:

Started At Wed May 14 08:21:54 -0700 2008
func1 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:55 -0700 2008
func1 at: Wed May 14 08:21:56 -0700 2008
func2 at: Wed May 14 08:21:56 -0700 2008
func1 at: Wed May 14 08:21:58 -0700 2008
End at Wed May 14 08:22:00 -0700 2008

執行緒生命週期

使用Thread.new建立新執行緒。您還可以使用同義詞Thread.startThread.fork

建立執行緒後無需啟動它,當 CPU 資源可用時,它會自動開始執行。

Thread 類定義了許多方法來查詢和操作正在執行的執行緒。執行緒執行與Thread.new呼叫關聯的程式碼塊中的程式碼,然後停止執行。

該程式碼塊中最後一個表示式的值就是執行緒的值,可以透過呼叫 Thread 物件的value方法獲得。如果執行緒已執行完成,則value會立即返回執行緒的值。否則,value方法會阻塞,直到執行緒完成。

類方法Thread.current返回表示當前執行緒的 Thread 物件。這允許執行緒操作自身。類方法Thread.main返回表示主執行緒的 Thread 物件。這是在啟動 Ruby 程式時開始的初始執行執行緒。

您可以透過呼叫該執行緒的Thread.join方法來等待特定執行緒完成。呼叫執行緒將阻塞,直到給定執行緒完成。

執行緒和異常

如果在主執行緒中引發異常,並且未在任何地方處理,則 Ruby 直譯器會列印一條訊息並退出。在除主執行緒之外的執行緒中,未處理的異常會導致執行緒停止執行。

如果執行緒t由於未處理的異常而退出,而另一個執行緒s呼叫t.join或t.value,則在t中發生的異常將線上程s中引發。

如果Thread.abort_on_exceptionfalse(預設條件),未處理的異常只會終止當前執行緒,其餘執行緒將繼續執行。

如果您希望任何執行緒中的任何未處理異常都導致直譯器退出,請將類方法Thread.abort_on_exception設定為true

t = Thread.new { ... }
t.abort_on_exception = true

執行緒變數

執行緒通常可以訪問建立執行緒時作用域內的任何變數。執行緒程式碼塊中的區域性變數是執行緒區域性變數,不會共享。

Thread 類提供了一種特殊機制,允許透過名稱建立和訪問執行緒區域性變數。您只需將執行緒物件視為雜湊表,使用[]=寫入元素,使用[]讀取元素。

在此示例中,每個執行緒都將變數count的當前值記錄在鍵為mycount的執行緒區域性變數中。

#!/usr/bin/ruby

count = 0
arr = []

10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end

arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

這將產生以下結果:

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

主執行緒等待子執行緒完成,然後打印出每個子執行緒捕獲的count值。

執行緒優先順序

影響執行緒排程的第一個因素是執行緒優先順序:高優先順序執行緒優先於低優先順序執行緒排程。更準確地說,只有在沒有更高優先順序的執行緒等待執行時,執行緒才能獲得 CPU 時間。

您可以使用priority =priority設定和查詢 Ruby Thread 物件的優先順序。新建立的執行緒以建立它的執行緒相同的優先順序啟動。主執行緒的優先順序從 0 開始。

無法線上程開始執行之前設定其優先順序。但是,執行緒可以將其自身優先順序作為其執行的第一個操作進行提高或降低。

執行緒互斥

如果兩個執行緒共享對相同資料的訪問,並且至少一個執行緒修改該資料,則必須特別注意確保任何執行緒都無法看到不一致狀態的資料。這稱為執行緒互斥

Mutex是一個實現簡單訊號量鎖的類,用於對某些共享資源進行互斥訪問。也就是說,在給定時間內,只有一個執行緒可以持有鎖。其他執行緒可以選擇等待鎖可用,或者可以選擇立即獲得錯誤,指示鎖不可用。

透過將對共享資料的全部訪問置於mutex的控制之下,我們確保了一致性和原子操作。讓我們嘗試兩個示例,第一個沒有 mutax,第二個有 mutax:

無 Mutex 示例

#!/usr/bin/ruby
require 'thread'

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

這將產生以下結果:

count1 :  1583766
count2 :  1583766
difference : 0
#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
   end
end
spy = Thread.new do
   loop do
      mutex.synchronize do
         difference += (count1 - count2).abs
      end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

這將產生以下結果:

count1 :  696591
count2 :  696591
difference : 0

處理死鎖

當我們開始使用Mutex物件進行執行緒互斥時,必須小心避免死鎖。死鎖是指所有執行緒都在等待獲取另一個執行緒持有的資源的情況。因為所有執行緒都被阻塞,所以它們無法釋放它們持有的鎖。並且因為它們無法釋放鎖,所以沒有其他執行緒可以獲取這些鎖。

這就是條件變數發揮作用的地方。條件變數只是一個與資源關聯的訊號量,用於在特定mutex的保護下使用。當您需要一個不可用的資源時,您會在條件變數上等待。該操作會釋放相應mutex上的鎖。當其他執行緒發出資源可用的訊號時,原始執行緒將停止等待並同時重新獲得關鍵區域的鎖。

示例

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}

puts "(Later, back at the ranch...)"

b = Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

這將產生以下結果:

A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!

執行緒狀態

有五個可能的返回值,對應於下表所示的五個可能狀態。status方法返回執行緒的狀態。

執行緒狀態 返回值
可執行 run
睡眠 睡眠
中止 aborting
正常終止 false
異常終止 nil

Thread 類方法

Thread類提供了以下方法,它們適用於程式中所有可用的執行緒。這些方法將使用Thread類名如下呼叫:

Thread.abort_on_exception = true

執行緒例項方法

這些方法適用於執行緒例項。這些方法將使用方法如下所示的Thread例項進行呼叫:

#!/usr/bin/ruby

thr = Thread.new do   # Calling a class method new
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # Calling an instance method join
廣告
© . All rights reserved.