1. 程式人生 > >scala 用actor併發統計詞頻

scala 用actor併發統計詞頻



import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.io.Source

case class MySend(file: String)

case class MyRecieve(msg: Map[String, Int])

case class Terminate(msg: String)

class Task extends Actor {
override def act(): Unit = {
loop {
react {
case MySend(file) => {//接收檔案
val msg = Source.fromFile(file).getLines().flatMap(_.split(" ")).map((_, 1)).toList.groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
sender ! MyRecieve(msg) //返回執行結果
}
case Terminate(msg) => {
sys.exit(1)
}
}
}
}
}



object TaskMain extends App {
val files = Set("G:\\1.txt", "G:\\2.txt")
val resSet = mutable.HashSet[Future[Any]]()
//起兩個actor,傳送非同步訊息,返回Future
for (file <- files) {
val res = new Task().start() !! MySend(file)
resSet += res
}

val lbf = new mutable.ListBuffer[Map[String, Int]]()
//等待計算結果,計算全部完成時推出迴圈
while (resSet.size != 0) {
for (elem <- resSet) {
if (elem.isSet) {//判斷任務是否執行完
lbf += elem.apply().asInstanceOf[MyRecieve].msg //任務執行完成,通過apply方法獲取執行結果(由sender傳送過來)
resSet -= elem //完成一個任務就從set裡面剔除,迴圈出口
}
}
}
//將兩個actor的結果相加排序
print(lbf.flatMap(_.toList).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)).toList.sortBy(_._2))
//傳送訊息給actor,通知程式結束
new Task().start() !! Terminate("00")
}