1. 程式人生 > >雷林鵬分享:Ruby 多執行緒

雷林鵬分享:Ruby 多執行緒

  Ruby 多執行緒

  每個正在系統上執行的程式都是一個程序。每個程序包含一到多個執行緒。

  執行緒是程式中一個單一的順序控制流程,在單個程式中同時執行多個執行緒完成不同的工作,稱為多執行緒。

  Ruby 中我們可以通過 Thread 類來建立多執行緒,Ruby的執行緒是一個輕量級的,可以以高效的方式來實現並行的程式碼。

  建立 Ruby 執行緒

  要啟動一個新的執行緒,只需要呼叫 Thread.new 即可:

  # 執行緒 #1 程式碼部分

  Thread.new {

  # 執行緒 #2 執行程式碼

  }

  # 執行緒 #1 執行程式碼

  例項

  以下例項展示瞭如何在Ruby程式中使用多執行緒:

  #!/usr/bin/ruby

  def func1

  i=0

  while i<=2

  puts "func1 at: #{Time.now}"

  sleep(2)

  i=i+1

  end

  end

  def func2

  j=0

  while j<=2

  puts "func2 at: #{Time.now}"

  sleep(1)

  j=j+1

  end

  end

  puts "Started At #{Time.now}"

  t1=Thread.new{func1()}

  t2=Thread.new{func2()}

  t1.join

  t2.join

  puts "End at #{Time.now}"

  以上程式碼執行結果為:

  Started At Wed May 14 08:21:54 -0700 2014

  func1 at: Wed May 14 08:21:54 -0700 2014

  func2 at: Wed May 14 08:21:54 -0700 2014

  func2 at: Wed May 14 08:21:55 -0700 2014

  func1 at: Wed May 14 08:21:56 -0700 2014

  func2 at: Wed May 14 08:21:56 -0700 2014

  func1 at: Wed May 14 08:21:58 -0700 2014

  End at Wed May 14 08:22:00 -0700 2014

  執行緒生命週期

  1、執行緒的建立可以使用Thread.new,同樣可以以同樣的語法使用Thread.start 或者Thread.fork這三個方法來建立執行緒。

  2、建立執行緒後無需啟動,執行緒會自動執行。

  3、Thread 類定義了一些方法來操控執行緒。執行緒執行Thread.new中的程式碼塊。

  4、執行緒程式碼塊中最後一個語句是執行緒的值,可以通過執行緒的方法來呼叫,如果執行緒執行完畢,則返回執行緒值,否則不返回值直到執行緒執行完畢。

  5、Thread.current 方法返回表示當前執行緒的物件。 Thread.main 方法返回主執行緒。

  6、通過 Thread.Join 方法來執行執行緒,這個方法會掛起主執行緒,直到當前執行緒執行完畢。

  執行緒狀態

  執行緒有5種狀態:

  執行緒狀態返回值

  Runnablerun

  SleepingSleeping

  Abortingaborting

  Terminated normallyfalse

  Terminated with exceptionnil

  執行緒和異常

  當某執行緒發生異常,且沒有被rescue捕捉到時,該執行緒通常會被無警告地終止。但是,若有其它執行緒因為Thread#join的關係一直等待該執行緒的話,則等待的執行緒同樣會被引發相同的異常。

  begin

  t = Thread.new do

  Thread.pass # 主執行緒確實在等join

  raise "unhandled exception"

  end

  t.join

  rescue

  p $! # => "unhandled exception"

  end

  使用下列3個方法,就可以讓直譯器在某個執行緒因異常而終止時中斷執行。

  啟動指令碼時指定-d選項,並以除錯模時執行。

  用Thread.abort_on_exception設定標誌。

  使用Thread#abort_on_exception對指定的執行緒設定標誌。

  當使用上述3種方法之一後,整個直譯器就會被中斷。

  t = Thread.new { ... }

  t.abort_on_exception = true

  執行緒同步控制

  在Ruby中,提供三種實現同步的方式,分別是:

  1. 通過Mutex類實現執行緒同步

  2. 監管資料交接的Queue類實現執行緒同步

  3. 使用ConditionVariable實現同步控制

  通過Mutex類實現執行緒同步

  通過Mutex類實現執行緒同步控制,如果在多個執行緒鍾同時需要一個程式變數,可以將這個變數部分使用lock鎖定。 程式碼如下:

  #encoding:gbk

  require "thread"

  puts "Synchronize Thread"

  @num=200

  @mutex=Mutex.new

  def buyTicket(num)

  @mutex.lock

  if @num>=num

  @[email protected]

  puts "you have successfully bought #{num} tickets"

  else

  puts "sorry,no enough tickets"

  end

  @mutex.unlock

  end

  ticket1=Thread.new 10 do

  10.times do |value|

  ticketNum=15

  buyTicket(ticketNum)

  sleep 0.01

  end

  end

  ticket2=Thread.new 10 do

  10.times do |value|

  ticketNum=20

  buyTicket(ticketNum)

  sleep 0.01

  end

  end

  sleep 1

  ticket1.join

  ticket2.join

  輸出結果如下:

  Synchronize Thread

  you have successfully bought 15 tickets

  you have successfully bought 20 tickets

  you have successfully bought 15 tickets

  you have successfully bought 20 tickets

  you have successfully bought 15 tickets

  you have successfully bought 20 tickets

  you have successfully bought 15 tickets

  you have successfully bought 20 tickets

  you have successfully bought 15 tickets

  you have successfully bought 20 tickets

  you have successfully bought 15 tickets

  sorry,no enough tickets

  sorry,no enough tickets

  sorry,no enough tickets

  sorry,no enough tickets

  sorry,no enough tickets

  sorry,no enough tickets

  sorry,no enough tickets

  sorry,no enough tickets

  sorry,no enough tickets

  除了使用lock鎖定變數,還可以使用try_lock鎖定變數,還可以使用Mutex.synchronize同步對某一個變數的訪問。

  監管資料交接的Queue類實現執行緒同步

  Queue類就是表示一個支援執行緒的佇列,能夠同步對佇列末尾進行訪問。不同的執行緒可以使用統一個對類,但是不用擔心這個佇列中的資料是否能夠同步,另外使用SizedQueue類能夠限制佇列的長度

  SizedQueue類能夠非常便捷的幫助我們開發執行緒同步的應用程式,應為只要加入到這個佇列中,就不用關心執行緒的同步問題。

  經典的生產者消費者問題:

  #encoding:gbk

  require "thread"

  puts "SizedQuee Test"

  queue = Queue.new

  producer = Thread.new do

  10.times do |i|

  sleep rand(i) # 讓執行緒睡眠一段時間

  queue << i

  puts "#{i} produced"

  end

  end

  consumer = Thread.new do

  10.times do |i|

  value = queue.pop

  sleep rand(i/2)

  puts "consumed #{value}"

  end

  end

  consumer.join

  程式的輸出:

  SizedQuee Test

  0 produced

  1 produced

  consumed 0

  2 produced

  consumed 1

  consumed 2

  3 produced

  consumed 34 produced

  consumed 4

  5 produced

  consumed 5

  6 produced

  consumed 6

  7 produced

  consumed 7

  8 produced

  9 produced

  consumed 8

  consumed 9

  使用ConditionVariable實現同步控制

  使用 ConditonVariable進行同步控制,能夠在一些致命的資源競爭部分掛起執行緒直到有可用的資源為止。

  #encoding:gbk

  require "thread"

  puts "thread synchronize by ConditionVariable"

  mutex = Mutex.new

  resource = ConditionVariable.new

  a = Thread.new {

  mutex.synchronize {

  # 這個執行緒目前需要resource這個資源

  resource.wait(mutex)

  puts "get resource"

  }

  }

  b = Thread.new {

  mutex.synchronize {

  #執行緒b完成對resourece資源的使用並釋放resource

  resource.signal

  }

  }

  a.join

  puts "complete"

  mutex 是宣告的一個資源,然後通過ConditionVariable來控制申請和釋放這個資源。

  b 執行緒完成了某些工作之後釋放資源resource.signal,這樣a執行緒就可以獲得一個mutex資源然後進行執行。 執行結果:

  thread synchronize by ConditionVariable

  get resource

  complete

  執行緒類方法

  完整的 Thread(執行緒) 類方法如下:

  序號方法描述

  1Thread.abort_on_exception

  若其值為真的話,一旦某執行緒因異常而終止時,整個直譯器就會被中斷。它的預設值是假,也就是說,在通常情況下,若某執行緒發生異常且該異常未被Thread#join等檢測到時,該執行緒會被無警告地終止。

  2Thread.abort_on_exception=

  如果設定為 true, 一旦某執行緒因異常而終止時,整個直譯器就會被中斷。返回新的狀態

  3Thread.critical

  返回布林值。

  4Thread.critical=

  當其值為true時,將不會進行執行緒切換。若當前執行緒掛起(stop)或有訊號(signal)干預時,其值將自動變為false。

  5Thread.current

  返回當前執行中的執行緒(當前執行緒)。

  6Thread.exit

  終止當前執行緒的執行。返回當前執行緒。若當前執行緒是唯一的一個執行緒時,將使用exit(0)來終止它的執行。

  7Thread.fork { block }

  與 Thread.new 一樣生成執行緒。

  8Thread.kill( aThread )

  終止執行緒的執行.

  9Thread.list

  返回處於執行狀態或掛起狀態的活執行緒的陣列。

  10Thread.main

  返回主執行緒。

  11Thread.new( [ arg ]* ) {| args | block }

  生成執行緒,並開始執行。數會被原封不動地傳遞給塊. 這就可以在啟動執行緒的同時,將值傳遞給該執行緒所固有的區域性變數。

  12Thread.pass

  將執行權交給其他執行緒. 它不會改變執行中的執行緒的狀態,而是將控制權交給其他可執行的執行緒(顯式的執行緒排程)。

  13Thread.start( [ args ]* ) {| args | block }

  生成執行緒,並開始執行。數會被原封不動地傳遞給塊. 這就可以在啟動執行緒的同時,將值傳遞給該執行緒所固有的區域性變數。

  14Thread.stop

  將當前執行緒掛起,直到其他執行緒使用run方法再次喚醒該執行緒。

  執行緒例項化方法

  以下例項呼叫了執行緒例項化方法 join:

  #!/usr/bin/ruby

  thr = Thread.new do # 例項化

  puts "In second thread"

  raise "Raise exception"

  end

  thr.join # 呼叫例項化方法 join

  以下是完整例項化方法列表:

  序號方法描述

  1thr[ name ]

  取出執行緒內與name相對應的固有資料。 name可以是字串或符號。 若沒有與name相對應的資料時, 返回nil。

  2thr[ name ] =

  設定執行緒內name相對應的固有資料的值, name可以是字串或符號。 若設為nil時, 將刪除該執行緒內對應資料。

  3thr.abort_on_exception

  返回布林值。

  4thr.abort_on_exception=

  若其值為true的話,一旦某執行緒因異常而終止時,整個直譯器就會被中斷。

  5thr.alive?

  若執行緒是"活"的,就返回true。

  6thr.exit

  終止執行緒的執行。返回self。

  7thr.join

  掛起當前執行緒,直到self執行緒終止執行為止. 若self因異常而終止時, 將會當前執行緒引發同樣的異常。

  8thr.key?

  若與name相對應的執行緒固有資料已經被定義的話,就返回true

  9thr.kill

  類似於 Thread.exit 。

  10thr.priority

  返回執行緒的優先度. 優先度的預設值為0. 該值越大則優先度越高.

  11thr.priority=

  設定執行緒的優先度. 也可以將其設定為負數.

  12thr.raise( anException )

  在該執行緒內強行引發異常.

  13thr.run

  重新啟動被掛起(stop)的執行緒. 與wakeup不同的是,它將立即進行執行緒的切換. 若對死程序使用該方法時, 將引發ThreadError異常.

  14thr.safe_level

  返回self 的安全等級. 當前執行緒的safe_level與$SAFE相同.

  15thr.status

  使用字串"run"、"sleep"或"aborting" 來表示活執行緒的狀態. 若某執行緒是正常終止的話,就返回false. 若因異常而終止的話,就返回nil。

  16thr.stop?

  若執行緒處於終止狀態(dead)或被掛起(stop)時,返回true.

  17thr.value

  一直等到self執行緒終止執行(等同於join)後,返回該執行緒的塊的返回值. 若線上程的執行過程中發生了異常, 就會再次引發該異常.

  18thr.wakeup

  把被掛起(stop)的執行緒的狀態改為可執行狀態(run), 若對死執行緒執行該方法時,將會引發ThreadError異常。

  (編輯:雷林鵬 來源:網路)