1. 程式人生 > >Scala Actor併發程式設計

Scala Actor併發程式設計

 Java中的併發程式設計主要通過執行緒實現的,通過共享資源的機制實現併發,但會面臨著死鎖的問題。在Scala中,是通過訊息傳遞來實現併發的,而Actor正是實現訊息傳遞的。

Scala的actor提供了一種基於事件的輕量級執行緒。只要使用scala.actors.Actor伴生物件的actor方法,就可以建立一個actor。它接受一個函式值/閉包做引數,一建立好就開始執行。用!()方法給actor發訊息,用receive()方法從actor接收訊息。receive()也可以閉包為引數,通常用模式匹配處理接收到的訊息。

我們看個例子,假定我們需要判定一個給定的數是否是完全數(完全數是一個正整數,其因子之和是該數的兩倍):

非併發程式設計的實現:

複製程式碼
def sumOfFactors(number:Int) = {
      (0/:(1 to number)){(sum, i) => if(number%i == 0) sum+i else sum
        }
    }
    
    def isPerfect(candidate:Int) = 2*candidate == sumOfFactors(candidate)

    println("6 is perfect? " + isPerfect(6))
    println("33550336 is perfect? " + isPerfect(33550336))
    println(
"33550337 is perfect? " + isPerfect(33550337))
複製程式碼

併發程式設計的實現,將從1到candidate數這個範圍內的數劃分成多個區間,把每個區間內求和的任務分配給單獨的程序。

複製程式碼
import scala.actors.Actor._
class FasterPerfectNumberFinder {
    def sumOfFactorsInRange(lower:Int, upper:Int, number:Int) = {
      (0/:(lower to upper)){(sum, i) => if(number%i == 0) sum+i else
sum } } def isPerfectConcurrent(candidate:Int) = { val RANGE = 1000000 val numberOfPartitions = (candidate.toDouble/RANGE).ceil.toInt val caller = self for(i<-0 until numberOfPartitions){ val lower = i*RANGE + 1 val upper = candidate min(i+1)*RANGE actor { caller ! sumOfFactorsInRange(lower,upper,candidate) } } val sum = (0 /: (0 until numberOfPartitions)){ (partialSum, i) => receive { case sumInRange:Int => partialSum + sumInRange } } 2 * candidate == sum } println("6 is perfect? " + isPerfectConcurrent(6)) println("33550336 is perfect? " + isPerfectConcurrent(33550336)) println("33550337 is perfect? " + isPerfectConcurrent(33550337)) } object FasterPerfectNumberFinder extends App{ new FasterPerfectNumberFinder() }
複製程式碼

程式執行結果如下:

6 is perfect? true
33550336 is perfect? true
33550337 is perfect? false

比較兩種方法用時的程式如下:

複製程式碼
import scala.actors.Actor._
class FindPerfectNumberOverRange {
  //普通實現
  def sumOfFactors(number:Int) = {
      (0/:(1 to number)){(sum, i) => if(number%i == 0) sum+i else sum
        }
    }
    
    def isPerfect(candidate:Int) = 2*candidate == sumOfFactors(candidate)

  //併發實現
  def sumOfFactorsInRange(lower:Int, upper:Int, number:Int) = {
      (0/:(lower to upper)){(sum, i) => if(number%i == 0) sum+i else sum
        }
    }
    
    def isPerfectConcurrent(candidate:Int) = {
      val RANGE = 1000000
      val numberOfPartitions = (candidate.toDouble/RANGE).ceil.toInt
      val caller = self
      
      for(i<-0 until numberOfPartitions){
        val lower = i*RANGE + 1
        val upper = candidate min(i+1)*RANGE
        
        actor {
          caller ! sumOfFactorsInRange(lower,upper,candidate)
        }
      }
      
      val sum = (0 /: (0 until numberOfPartitions)){ (partialSum, i) =>
        receive {
          case sumInRange:Int => partialSum + sumInRange
          }
        }
      2 * candidate == sum
    }
  
    //比較時間花費
    def countPerfectNumbersInRange(start:Int, end:Int, isPerfectFinder:Int => Boolean)={
      val startTime = System.nanoTime()
      val numberOfPerfectNumbers = (0 /: (start to end)){(count, candidate) =>
        if(isPerfectFinder(candidate)) count + 1 else count
        }
      val endTime = System.nanoTime()
      println("Found " + numberOfPerfectNumbers + " perfect numbers in given range, took " +
          (endTime-startTime)/1000000000.0 + " secs")
    }
    
}

object FindPerfectNumberOverRange extends App{
    val fpn = new FindPerfectNumberOverRange()
      val startNumber = 33550300
    val endNumber = 33550400
    fpn.countPerfectNumbersInRange(startNumber, endNumber, fpn.isPerfect)
    fpn.countPerfectNumbersInRange(startNumber, endNumber, fpn.isPerfectConcurrent)
}
複製程式碼

程式執行結果如下:

Found 1 perfect numbers in given range, took 53.505288657 secs
Found 1 perfect numbers in given range, took 35.739131734 secs

 2. 訊息傳遞

下面看一下訊息是如何從一個actor傳到另一個actor。

複製程式碼
import scala.actors.Actor._
class MessagePassing {
    var startTime : Long = 0
    val caller = self
    
    val engrossedActor = actor {
      println("Number of messages received so far? " + mailboxSize)
      caller ! "send"
      Thread.sleep(3000)
      println("Number of messages received while I was busy? " + mailboxSize)
      receive {
        case msg => 
          val receivedTime = System.currentTimeMillis() - startTime
          println("Received message " + msg + "after " + receivedTime + " ms")
      }
      caller ! "received"
    }
    
    receive { case _ =>}
    
    println("Sending Message ")
    startTime = System.currentTimeMillis()
    engrossedActor ! "hello buddy"
    val endTime = System.currentTimeMillis() - startTime
    
    printf("Took less than %dms to send message\n", endTime)
    
    receive {
      case _ =>
    }
}

object MessagePassing extends App {
  new MessagePassing()
}
複製程式碼

程式執行結果如下:

Number of messages received so far? 0
Sending Message 
Took less than 0ms to send message
Number of messages received while I was busy? 0
Received message hello buddyafter 2997 ms

從輸出可以看出,傳送不阻塞,接收不中斷。在actor呼叫receive()方法接收之前,訊息會一直等在那裡。

 非同步地傳送和接收訊息是一項好的實踐——可以最大限度的利用併發。不過,如果對同步的傳送訊息和接收響應有興趣,可以用!?()方法。在接收發訊息的目標actor給出響應之前,她會一直阻塞在那裡。這會引起潛在的死鎖。一個已經失敗的actor會導致其他actor的失敗,然後就輪到應用失敗了。所以,即便要用這個方法,至少要用有超時引數的變體,像這樣:

複製程式碼
package com.cn.gao
import scala.actors._
import Actor._
class AskFortune {
    val fortuneTeller = actor {
      for(i <- 1 to 4) {
        Thread.sleep(1000);
        receive {
          case _ => sender ! "your day will rock! "+ i
          //case _ => reply("your day will rock! " + i) // same as above
        }
      }
    }
    
    println(fortuneTeller !? (2000, "what's ahead"))
    println(fortuneTeller !? (500, "what's ahead"))
    
    val aPrinter = actor {
      receive { case msg => println("Ah, fortune message for you-"+ msg)}
    }
    
    fortuneTeller.send("What's up", aPrinter)
    
    fortuneTeller ! "How's my future?"
    
    Thread.sleep(3000)
    receive{ case msg : String => println("Received "+ msg)}
    
    println("Let's get that lost message")
    receive { case !(channel,msg) => println("Received belated message "+ msg)}
}

object AskFortune extends App{
  new AskFortune()
}
複製程式碼

在超時之前,如果actor傳送回訊息,!?()方法就會返回結果。否則,它會返回None,所以,這個方法的返回型別是Option[Any]。在上面的程式碼中,sender所引用的是最近一個傳送訊息的actor。程式執行結果如下:

複製程式碼
Some(your day will rock! 1)
None
Ah, fortune message for you-your day will rock! 3
Received your day will rock! 4
Let's get that lost message
Received belated message your day will rock! 2
複製程式碼

 3. Actor類

如果想在actor啟動時進行顯式控制,希望在actor裡存入更多資訊,可以建立一個物件,混入Actor trait。這是對的——Scala的Actor只是個trait,可以在任何喜歡的地方混入它。下面是個例子:

AnsweringService.scala

複製程式碼
package com.cn.gao
import scala.actors._
import Actor._

class AnsweringService(val folks:String*) extends Actor {
    def act(){
      while(true){
        receive{
          case(caller: Actor, name:String, msg:String) =>
            caller ! (
                if(folks.contains(name))
                  String.format("Hey it's %s got message %s", name, msg)
                else
                  String.format("Hey there's no one with the name %s here",name)
                  )
          case "ping" => println("ping!")
          case "quit" => println("existing actor")
              exit
        }
      }
    }
}

object AnsweringService extends App{
    val answeringService1 = new AnsweringService("Sara", "Kara", "John")
    answeringService1 ! (self, "Sara", "In town")
    answeringService1 ! (self, "Kara", "Go shopping?")
    
    answeringService1.start()
    
    answeringService1 ! (self, "John", "Bug fixed?")
    answeringService1 ! (self, "Bill", "What's up")
    
    for(i <- 1 to 4) { receive { case msg => println(msg)}}
    
    answeringService1 ! "ping"
    answeringService1 ! "quit"
    answeringService1 ! "ping"
    
    Thread.sleep(2000)
    println("The last ping was not processed")
}
複製程式碼

程式執行結果如下:

複製程式碼
Hey it's Sara got message In town
Hey it's Kara got message Go shopping?
Hey it's John got message Bug fixed?
Hey there's no one with the name Bill here
ping!
existing actor
The last ping was not processed
複製程式碼

開始,我們給actor傳送了一些元組訊息。這些訊息不會立即得到處理,因為actor還沒有啟動。它們會進入佇列,等待後續處理。然後呼叫start()方法,再發送一些訊息。只要呼叫了start()方法,就會有一個單獨的執行緒呼叫actor的act()方法。這時,曾經發出去的所有訊息都開始進行處理。然後,我們迴圈接收對方發出的四條訊息的應答。

呼叫exit()方法可以停止actor。不過這個方法只是丟擲異常,試圖終止當前執行緒的執行,所以,在act()方法裡呼叫挺不錯。

4. actor方法

如果對顯式啟動actor並不真的那麼關注,那麼可以使用actor()方法。在actor間傳遞資料,可以用!()和receive()方法。下面從一個使用actor方法的例子開始,然後重構,使其併發。

這個方法isPrime()告訴我們給定的數是不是素數。為了達到說明的目的,我在方法里加了一些列印語句:

複製程式碼
package com.cn.gao
import scala.actors._
import Actor._
class PrimeTeller {
    def isPrime(number: Int) = {
      println("Going to find if " + number + " is prime")
      
      var result = true
      if(number == 2 || number == 3) result = true
      
      for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){
        if(number % i == 0) result = false
      }
      
      println("done finding if " + number + " is prime")
      result
    }
}
複製程式碼

呼叫上面這段程式碼的話,接收到應答之前,就會阻塞在那裡。如下所示,這裡把呼叫這個方法的職責委託給一個actor。這個actor會確定一個數是否是素數,然後,用一個非同步響應發回給呼叫者。

複製程式碼
package com.cn.gao
import scala.actors._
import Actor._
object PrimeTeller extends App {
    def isPrime(number: Int) = {
      println("Going to find if " + number + " is prime")
      
      var result = true
      if(number == 2 || number == 3) result = true
      
      for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){
        if(number % i == 0) result = false
      }
      
      println("done finding if " + number + " is prime")
      result
    }
    
    val primeTeller = actor{
      var continue = true
      
      while(continue){
        receive {
          case (caller: Actor, number:Int) => caller ! (number,
              isPrime(number))
          case "quit" => continue = false
        }
      }
    }
    
     primeTeller ! (self, 2)
     primeTeller ! (self, 131)
     primeTeller ! (self, 132)
     
     for(i<- 1 to 3){
       receive {
         case (number, result) => println(number + "is prime? " + result)
       }
     }
     
     primeTeller ! "quit"
}
複製程式碼

primeTeller是一個引用,它指向了用actor()方法建立的一個匿名actor。它會不斷迴圈,直到接收到“quit”訊息。除了退出訊息,它還能接收一個包含caller和number的元組。收到這個訊息時,它會判斷給定的數是否是素數,然後,給caller發回一個訊息。

程式執行結果如下:

複製程式碼
Going to find if 2 is prime
done finding if 2 is prime
Going to find if 131 is prime
2is prime? true
done finding if 131 is prime
Going to find if 132 is prime
131is prime? true
done finding if 132 is prime
132is prime? false
複製程式碼

 上面的程式碼處理了接收到的每個數字;從輸出可以看到這一點。在actor忙於判斷一個數是否是素數時,如果又接收到多個請求,它們就會進入佇列。因此,即便是將執行委託給了actor,它依然是順序的。

讓這個例子並行相當容易,在PrimeTeller actor的第6行,不要去呼叫isPrime(),而是把這個職責委託給另一個actor,讓它給呼叫者回復應答,程式如下:

複製程式碼
package com.cn.gao
import scala.actors._
import Actor._
object PrimeTeller extends App {
    def isPrime(number: Int) = {
      println("Going to find if " + number + " is prime")
      
      var result = true
      if(number == 2 || number == 3) result = true
      
      for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){
        if(number % i == 0) result = false
      }
      
      println("done finding if " + number + " is prime")
      result
    }
    
    val primeTeller = actor{
      var continue = true
      
      while(continue){
        receive {
//          case (caller: Actor, number:Int) => caller ! (number,
//              isPrime(number))
          case (caller: Actor, number:Int) => actor {caller ! (number,
              isPrime(number))}
          case "quit" => continue = false
        }
      }
    }
    
     primeTeller ! (self, 2)
     primeTeller ! (self, 131)
     primeTeller ! (self, 132)
     
     for(i<- 1 to 3){
       receive {
         case (number, result) => println(number + "is prime? " + result)
       }
     }
     
     primeTeller ! "quit"
}
複製程式碼

再次執行上面的程式碼,我們會看到,多個請求併發地執行了,如下所示:

複製程式碼
Going to find if 2 is prime
Going to find if 131 is prime
Going to find if 132 is prime
done finding if 132 is prime
done finding if 2 is prime
done finding if 131 is prime
132is prime? false
131is prime? true
2is prime? true
複製程式碼

 5. receive和receiveWithin方法

receive()接收一個函式值/閉包,返回一個處理訊息的應答。下面是個從receive()方法接收結果的例子:

複製程式碼
package com.cn.gao
import scala.actors.Actor._
object Receive extends App {
    val caller = self
    
    val accumulator = actor {
      var sum = 0
      var continue = true
      while(continue) {
        sum += receive {
          case number:Int => number
          case "quit" => continue = false
          0
        }
      }
      caller ! sum
    }
    
    accumulator ! 1
    accumulator ! 7
    accumulator ! 8
    accumulator ! "quit"
    
    receive{case result => println("Total is " + result)}
}
複製程式碼

accumulator接收數字,對傳給它的數字求和。完成之後,它會發回一個訊息,帶有求和的結果。上面程式碼的輸出如下:

Total is 16

呼叫receive()方法會造成程式阻塞,直到實際接收到應答為止。如果預期的actor應答一直沒有發過來就麻煩了。這會讓我們一直等下去。用receiveWithin()方法修正這一點,它會接收一個timeout引數,如下:

複製程式碼
package com.cn.gao
import scala.actors._
import scala.actors.Actor._
object ReceiveWithin extends App {
    val caller = self
    
    val accumulator = actor {
      var sum = 0
      var continue = true
      while(continue) {
        sum += receiveWithin(1000) {
          case number:Int => number
          case TIMEOUT =>
            println("Time out! Will return result now")
            continue = false
            0
        }
      }
      caller ! sum
    }
    
    accumulator ! 1
    accumulator ! 7
    accumulator ! 8
    
    receiveWithin(2000) {
      case result => println("Total is " + result)
    }
}
複製程式碼

在給定的超時期限內,如果什麼都沒有收到,receiveWithin()方法會收到一個TIMEOUT訊息。如果不對其進行模式匹配,就會丟擲異常。在上面的程式碼裡,接收到TIMEOUT訊息當做了完成值累加的訊號。輸出如下:

Time out! Will return result now
Total is 16

我們應該傾向於使用receiveWithin()方法而非receive()方法,避免產生活性等待問題。

recevie()和receiveWithin()方法把函式值當作偏應用函式,呼叫程式碼塊之前,會檢查它是否處理訊息。所以,如果接收到一個非預期的訊息,就會悄悄地忽略它。當然,如果想把忽略的訊息顯示出來,可以提供一個case_=>...語句。下面這個例子展示了忽略的無效訊息:

複製程式碼
package com.cn.gao
import scala.actors._
import Actor._
object MessageIgnore extends App{
    val expectStringOrInteger = actor {
      for(i <- 1 to 4) {
        receiveWithin(1000) {
          case str: String =>println("You said " + str)
          case num: Int => println("You gave " + num)
          case TIMEOUT => println("Time out!")
        }
      }
    }
    
    expectStringOrInteger ! "only constant is change"
    expectStringOrInteger ! 1024
    expectStringOrInteger ! 22.22
    expectStringOrInteger ! (self, 1024)
    
    receiveWithin(3000){case _ => }
}
複製程式碼

在程式碼最後,放了一個receiveWithin()的呼叫。因為主執行緒退出時,程式就退出了,這個語句保證程式還活動著,給actor一個應答的機會。從輸出中可以看出,actor處理了前兩個傳送給它的訊息,忽略了後兩個,因為它們沒有匹配上預期的訊息模式。程式最終會超時,因為沒有再接收到任何可以匹配的訊息。輸出結果如下:

You said only constant is change
You gave 1024
Time out!
Time out!

6. react和reactWithin方法

在每個actor裡,呼叫receive()的時候實際上會要求有一個單獨的執行緒。這個執行緒會一直持有,直到這個actor結束。也就是說,即便是在等待訊息到達,程式也會持有這些執行緒,每個actor一個,這絕對是一種資源浪費。Scala不得不持有這些執行緒的原因在於,控制流的執行過程中有一些具體狀態。如果在呼叫序列裡沒有需要保持和返回的狀態,Scala幾乎就可以從執行緒池裡獲取任意執行緒執行訊息處理——這恰恰就是使用react()所做的事情。react()不同於receive(),它並不返回任何結果。實際上,它並不從呼叫中返回。

如果處理了react()的當前訊息後,還要處理更多的訊息,就要在訊息處理的末尾呼叫其他方法。Scala會把這個呼叫執行交給執行緒池裡的任意執行緒。看一個這種行為的例子:

複製程式碼
package com.cn.gao
import scala.actors.Actor._
import scala.actors._
object React extends App {
    def info(msg:String) = println(msg + " received by " + Thread.currentThread())
    
    def receiveMessage(id:Int) {
      for(i <- 1 to 2) {
        receiveWithin(20000) {
          case msg:String => info("receive: " + id + msg)
          case TIMEOUT => 
        }
      }
    }
    
    def reactMessage(id:Int){
      react {
        case msg:String => info("react: " + id + msg)
        reactMessage(id)
      }
    }
    
    val actors = Array (
      actor {info("react:   1 actor created"); reactMessage(1)},
      actor {info("react:   2 actor created"); reactMessage(2)},
      actor {info("receive: 3 actor created"); receiveMessage(3)},
      actor {info("receive: 4 actor created"); receiveMessage(4)}
    )
    
    Thread.sleep(1000)
    for(i <- 0 to 3){actors(i) ! " hello"; Thread.sleep(2000)}
    Thread.sleep(2000)
    for(i <- 0 to 3){actors(i) ! " hello"; Thread.sleep(2000)}
}
複製程式碼

上面的程式碼輸出結果如下:

複製程式碼
react:   1 actor created received by Thread[ForkJoinPool-1-worker-5,5,main]
react:   2 actor created received by Thread[ForkJoinPool-1-worker-3,5,main]
receive: 3 actor created received by Thread[ForkJoinPool-1-worker-1,5,main]
receive: 4 actor created received by Thread[ForkJoinPool-1-worker-7,5,main]
react: 1 hello received by Thread[ForkJoinPool-1-worker-3,5,main]
react: 2 hello received by Thread[ForkJoinPool-1-worker-3,5,main]
receive: 3 hello received by Thread[ForkJoinPool-1-worker-1,5,main]
receive: 4 hello received by Thread[ForkJoinPool-1-worker-7,5,main]
react: 1 hello received by Thread[ForkJoinPool-1-worker-5,5,main]
react: 2 hello received by