1. 程式人生 > >Scala(四)-- Actor程式設計

Scala(四)-- Actor程式設計

Actor程式設計

1.Actor模型

1.1 什麼是Actor 一個Actor指的是一個最基本的計算單元。它能接收一個訊息並且基於其執行計算。 這個理念很像面嚮物件語言,一個物件接收一條訊息(方法呼叫),然後根據接收的訊息做事(呼叫了哪個方法)。 Actors一大重要特徵在於actors之間相互隔離,它們並不互相共享記憶體。這點區別於上述的物件。也就是說,一個actor能維持一個私有的狀態,並且這個狀態不可能被另一個actor所改變。

聚沙成塔

One ant is no ant, one actor is no actor. 光有一個actor是不夠的,多個actors才能組成系統。在actor模型裡每個actor都有地址,所以它們才能夠相互發送訊息。 Scala中的Actor能夠實現並行程式設計的強大功能,它是基於事件模型的併發機制,Scala是運用訊息(message)的傳送、接收來實現多執行緒的。使用Scala能夠更容易地實現多執行緒應用的開發。 Actor是電腦科學領域中的一個平行計算模型,它把actors當做通用的平行計算原語:一個actor對接收到的訊息做出響應,進行本地決策,可以建立更多的actor,或者傳送更多的訊息;同時準備接收下一條訊息。 在Actor理論中,一切都被認為是actor,這和麵向物件語言裡一切都被看成物件很類似。但包括面嚮物件語言在內的軟體通常是順序執行的,而Actor模型本質上則是併發的。 每個Actor都有一個(恰好一個)Mailbox。Mailbox相當於是一個小型的佇列,一旦Sender傳送訊息,就是將該訊息入隊到Mailbox中。入隊的順序按照訊息傳送的時間順序。Mailbox有多種實現,預設為FIFO。但也可以根據優先順序考慮出隊順序,實現演算法則不相同。

訊息和信箱

非同步地傳送訊息是用actor模型程式設計的重要特性之一。訊息並不是直接傳送到一個actor,而是傳送到一個信箱(mailbox) 這樣的設計解耦了actor之間的關係——actor都以自己的步調執行,且傳送訊息時不會被阻塞。雖然所有actor可以同時執行,但它們都按照信箱接收訊息的順序來依次處理訊息,且僅在當前訊息處理完成後才會處理下一個訊息,因此我們只需要關心傳送訊息時的併發問題即可。 Actor之間通過傳送訊息來通訊,訊息的傳送是非同步的,通過一個郵件佇列(mail queue)來處理訊息。每個Actor是完全獨立的,可以同時執行它們的操作。每一個Actor是一個計算實體,對映接收到的訊息到以下動作:

  • 傳送有限個訊息給其它Actor
  • 建立有限個新的Actor
  • 為下一個接收的訊息指定行為 以上三種動作並沒有固定的順序,可以併發地執行。Actor會根據接收到的訊息進行不同的處理。

Actor模型有兩種任務排程方式:基於執行緒的排程以及基於事件的排程

基於執行緒的排程:為每個Actor分配一個執行緒,在接收一個訊息時,如果當前Actor的郵箱(mail box)為空,則會阻塞當前執行緒。 基於執行緒的排程實現較為簡單,但執行緒數量受到操作的限制,現在的Actor模型一般不採用這種方式; 基於事件的除錯:事件可以理解為上述任務或訊息的到來,而此時才會為Actor的任務分配執行緒並執行。

綜上,我們知道可以把系統中的所有事物都抽象成一個Actor:

  • Actor的輸入是接收到的訊息。
  • Actor接收到訊息後處理訊息中定義的任務。
  • Actor處理完成任務後可以傳送訊息給其它的Actor。

1.2 Actors有郵箱 只得指明的一點是,儘管許多actors同時執行,但是一個actor只能順序地處理訊息。也就是說其它actors傳送了三條訊息給一個actor,這個actor只能一次處理一條。所以如果你要並行處理3條訊息,你需要把這條訊息發給3個actors。 訊息非同步地傳送到actor,所以當actor正在處理訊息時,新來的訊息應該儲存到別的地方。Mailbox就是這些訊息儲存的地方。 Actors通過非同步訊息溝通,在處理訊息之前訊息被存放在Mailbox中。

1.3 Actors做什麼 當一個actor接收到訊息後,它能做如下三件事中的一件: Create more actors; 建立其他actors Send messages to other actors; 向其他actors傳送訊息 Designates what to do with the next message. 指定下一條訊息到來的行為 前兩件事比較直觀,第三件卻很有意思。我之前說過一個actor能維持一個私有狀態。「指定下一條訊息來到做什麼」意味著可以定義下條訊息來到時的狀態。更清楚地說,就是actors如何修改狀態。 設想有一個actor像計算器,它的初始狀態是數字0。當這個actor接收到add(1)訊息時,它並不改變它原本的狀態,而是指定當它接收到下一個訊息時,狀態會變為1。

1.4 容錯 Fault tolerance

Erlang 引入了「隨它崩潰」的哲學理念,這部分關鍵程式碼被監控著,監控者的唯一職責是知道程式碼崩潰後幹什麼(如將這個單元程式碼重置為正常狀態),讓這種理念成為可能的正是actor模型。 每段程式碼都執行在process中,process是erlang稱呼actor的方式。這個process完全獨立,意味著它的狀態不會影響其他process。我們有個supervisor,實際上它只是另一個process(所有東西都是actor),當被監控的process掛了,supervisor這個process會被通知並對此進行處理。這就讓我們能建立「自愈」系統了。如果一個actor到達異常狀態並崩潰,無論如何,supervisor都可以做出反應並嘗試把它變成一致狀態,這裡有很多策略,最常見的是根據初始狀態重啟actor。

1.5 分散式 Distribution 另一個關於actor模型的有趣方面是它並不在意訊息傳送到的actor是本地的或者是另外節點上的。 轉念一想,如果actor只是一些程式碼,包含了一個mailbox和一個內部狀態,actor只對訊息做出響應,誰會關注它執行在哪個機器上呢?只要我們能讓訊息到達就行了。這允許我們基於許多計算機上構建系統,並且恢復其中任何一臺.

1.6 對比傳統java併發程式設計與Scala Actor程式設計

Scala中的Actor能夠實現並行程式設計的強大功能,它是基於事件模型的併發機制,Scala是運用訊息(message)的傳送、接收來實現多執行緒的

java 內建執行緒模型 Scala actor模型
共享資料鎖,每個object有一個monitor,監視多執行緒對共享資料的訪問 不共享資料,actor之間通過message通訊加鎖的程式碼段用syncronized標識
加鎖的程式碼段用syncronized標識
死鎖問題:兩個物件在沒有釋放資源的狀況下又去申請新的資源,造成互相等待,形成死鎖
每個執行緒內部都是順序執行的 每個actor內部都是順序執行的

1.7 Actor方法執行順序

1.首先呼叫start()方法啟動Actor 2.呼叫start()方法後其act()方法會被執行 3.向Actor傳送訊息

1.8 傳送訊息的方式

! 傳送非同步訊息,沒有返回值。
!! 傳送非同步訊息,返回值是 Future[Any]
!? 傳送同步訊息,等待返回值。

Actor例項 -1-

import scala.actors.Actor

object actor1 extends Actor{
  def act():Unit = {
    for(i<-1 to 10){
      println("actor1---"+i)
      Thread.sleep(1000)
    }
  }
}

object actor2 extends Actor{
  def act():Unit = {
    for(i<-1 to 10){
      println("actor2---"+i)
      Thread.sleep(1000)
    }
  }
}

object ActorDemo {
  def main(args: Array[String]): Unit = {
    //啟動actor
    actor1.start()
    actor2.start()
  }
}

說明:上面分別呼叫了兩個單例物件的start()方法,他們的act()方法會被執行,相同與在java中開啟了兩個執行緒,執行緒的run()方法會被執行。 注意:這兩個Actor是並行執行的,act()方法中的for迴圈執行完成後actor程式就退出了

-2- receive相當於是建立執行緒和銷燬執行緒的過程,可以不斷地接收訊息 接收訊息方式1:receive 特點:要反覆處理訊息,receive外層用while(…), 不用的話只處理一次。

class actor3 extends Actor{
  override def act(): Unit = {
    while (true) {
      //receive引數是一個匿名函式
      receive {
        case "start" => {
          println("Starting")
          Thread.sleep(2000)
          println("started")
        }
        case "stop" => {
          println("Stopping")
          Thread.sleep(2000)
          println("stoped")
        }
      }
    }
  }
}
object ActorDemo2 {
  def main(args: Array[String]): Unit = {
    val myactor = new actor3()
    myactor.start()

    //傳送非同步無返回值的訊息
    myactor !"start"
    myactor !"stop"
    println("have finished")
  }
}

說明:在act()方法中加入了while (true) 迴圈,就可以不停的接收訊息 注意:傳送start訊息和stop的訊息是非同步的,但是Actor接收到訊息執行的過程是同步的按順序執行

-3- react類似執行緒池機制,可以複用執行緒,不斷地接收訊息,使用react 特點: (1) 從不返回 (2) 要反覆執行訊息處理,react外層用loop,不能用while(…) (3) 通過複用執行緒,比receive更高效 應儘可能使用react,react方式會複用執行緒,比receive更高效

import scala.actors.Actor
class actor4 extends Actor{
  override def act(): Unit = {
    loop {
      //react可以進行執行緒複用,不用重新建立和銷燬執行緒,減少在資源佔用
      react {
        case "start" => {
          println("Starting")
          Thread.sleep(2000)
          println("started")
        }
        case "stop" => {
          println("Stopping")
          Thread.sleep(2000)
          println("stoped")
        }
      }
    }

  }
}
object ActorDemo3 {
  def main(args: Array[String]): Unit = {
    val myactor = new actor4()
    myactor.start()
    //傳送非同步無返回值的訊息
    myactor !"start"
    myactor !"stop"
    println("have finished")
  }
}

-4-Scala寫wordCount

object ScalaWordCount {
  def main(args: Array[String]): Unit = {

    //模擬從檔案讀取字串
    val lines = List("hello scala spark","hello scala java","hello hadoop java")

    //資料切分,生成一個一個的單詞,並壓平
    val words:List[String] = lines.flatMap(_.split(" "))

    //過濾資料中多餘的空格
    val filterwords:List[String] = words.filter(_ != " ")

    //把一個一個的單詞程式設計一個一個的元組(word,1)
    val tuples:List[(String,Int)] = filterwords.map((_,1))

    //按照key進行分組
    val grouped:Map[String,List[(String,Int)]] = tuples.groupBy(_._1)

    //聚合,相同的key的value
    val sumed:Map[String,Int] = grouped.mapValues(_.size)

    //對結果進行排序
    val sorted:List[(String,Int)] = sumed.toList.sortWith(_._2 > _._2)

    //一句話實現
    //val result:List[(String,Int)] = lines.flatMap(_.split(" ")).filter(_ != "").map((_,1)).groupBy(_._1).mapValues(_.size).toList.sortWith(_._2 > _._2)

    //列印
    println(sorted)
  }
}

-5-ActorWordCount

import scala.actors.{Actor, Future}
import scala.collection.mutable.ArrayBuffer
import scala.io.Source

//使用actor同時處理多個檔案,wordcount,最後將結果彙總
object ActorWordCount {
  def main(args: Array[String]): Unit = {
    //儲存所有actor返回的處理結果
    val replys = ArrayBuffer[Future[Any]]()

    //獲取任務
    val files = Array("C:\\Users\\brz\\Desktop\\hbaseMR\\hbaseMR\\hbase2hdfs.java","C:\\Users\\brz\\Desktop\\hbaseMR\\hbaseMR\\hdfs2hbase.java")

    //分發任務
    for(file <- files){
      val task = new Task
      task.start()

      //傳送處理任務 (wordcount處理結果 )
      val reply:Future[Any] = task !! submitTask(file) //回撥
      replys += reply
    }
    //彙總處理結果
    val processedReplys = new ArrayBuffer[Map[String,Int]]()
    while(replys.size>0){
      //遍歷future
      //有返回結果的future陣列
      val done = replys.filter(_.isSet)
      for(res <- done){
        //一個future的返回結果
        val mapped = res.apply().asInstanceOf[Map[String,Int]]
        //把每一個處理結果放到彙總的數組裡
        processedReplys += mapped
        replys -= res
      }
    val res = processedReplys.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
    println(res)
  }

  class Task extends Actor{
    override def act(): Unit = {
      while (true){
        receive({

          //拿到任務
          case submitTask(file)=>{
            val line = Source.fromFile(file).getLines()
            val listline = line.toList
            val words = listline.flatMap(_.split(" "))
            val tuples = words.map((_,1))
            val grouped = tuples.groupBy(_._1)
            val sumed = grouped.mapValues(_.size)
            //將結果返回
            sender ! sumed
          }
        })
      }
    }
  }
  case class submitTask(file:String)
}