1. 程式人生 > >基於Scala的Actor對字進行統計

基於Scala的Actor對字進行統計

基於Scala的Actor對字進行統計   

用actor併發程式設計寫一個單機版的WorldCount,將多個檔案作為輸入,計算完成後將多個任務彙總,得到最終的結果

package com.zhiyou100.ScalaActor_akka
import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.io.Source

class ActorWordCount extends Actor{
  override def act(): Unit = {
 loop{
   react{
     case MapTask(filename) =>{
       //map的業務邏輯 1.讀取檔案2. 單詞切割 3. (key,1) 4.combiner
       //區域性彙總:結果是Map[String,Int]
       val result = Source.fromFile(filename).getLines().
         flatMap(_.split(" ")).map((_,1)).toList.groupBy(_._1).mapValues(_.size)
       //將Map傳送給reduce
       sender ! ReduceTask(result)
     }
     case ExistTask =>{
       exit()
     }
   }
 }
  }
}
object ActorWordCount{
  def main(args: Array[String]): Unit = {
   val resSet = new mutable.HashSet[Future[Any]]()
    val resultList=new ListBuffer[ReduceTask]
    val files=Array[String](xs="E:\\1\\2\\1\\MR.txt")
    for (filename<- files){
      val actor=new ActorWordCount
     val  res = actor.start()  !! MapTask(filename) //啟動actor並向Map傳送訊息,返回值是一個Future
     resSet += res //把這些Future
    }
       while(resSet.size>0){
     val toHandle= resSet.filter(_.isSet)  //取出有效的結果,待處理的資料
          for (f<- toHandle){
            val result=f.apply()
            val result1 = result .asInstanceOf[ReduceTask]  //獲取ReduceTask的例項
            resultList +=  result1  //將有效的資料放到另一個容器中
            resSet -=f  //從futuer的集合中刪除用過的元素
          }
    }
    println(resultList)
   val rd1 = resultList.flatMap(_.result).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2))
    println(rd1)
    //resultlist:((hello,3),(hello,2),(tom,1)...) 此時是一個wordCount
   val r= resultList.flatMap(_.result).groupBy(_._1).mapValues(_.size)
    println(r.toBuffer)
  }
}
case class MapTask(filename:String)
case class ReduceTask(result:Map[String,Int])
case object ExistTask   //退出Task