1. 程式人生 > >scala用actor併發程式設計寫一個單機版的WorldCount(類似Hadoop的MapReduce思想)

scala用actor併發程式設計寫一個單機版的WorldCount(類似Hadoop的MapReduce思想)

1、準備資料,2個檔案

words.txt

內容:

lilei hello
zhangsan hello
lisi hello
蘇三 hello

words.log

內容:

lilei hello
zhangsan hello
lisi hello

2、環境Intellj IDEA   scala外掛

3、程式碼

package p1
import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.io.Source

//模式匹配類,用於提交任務
case class SubmitTask(fileName:String)
//單例的模式匹配類,用於停止任務
case object  StopTask
//用於收集分組後結果的
case class ResultTask(result:Map[String,Int])
/**
  * scala Actor構建在java的執行緒基礎之上的,
  * 為了避免頻繁的執行緒建立、銷燬和切換等,scala中提供了react方法
  * 方法執行完畢後,仍然被保留
  */
class Task extends Actor{

   override def act(){
      loop{//重複執行一個程式碼塊
         react{
            case SubmitTask(fileName)=>{
               val result=Source.fromFile(fileName,"gb2312").getLines()//獲取檔案,有中文-編碼,每一行生成一個List集合
                     .flatMap(_.split(" ")).map((_,1)).toList//把上面所有集合壓縮成一個集合,再切分,再生成map-(“單詞”,1)
                     .groupBy(_._1).mapValues(_.size)//按照key分組,value就是分組後map的數量
               sender ! ResultTask(result)//把單個檔案的統計結果輸出,!代表非同步執行
            }
            case StopTask=>{
               exit()
            }
         }
      }
   }
}
object WorkCount{
   def main(args: Array[String]) {
      //要讀取的檔案
      val files=Array("E://words.txt","E://words.log")
      val replaySet=new mutable.HashSet[Future[Any]]
      val resultList=new mutable.ListBuffer[ResultTask]

      //每個檔案啟動一個執行緒,非同步提交,replaySet接收返回的值
      for(f<-files){
         val t=new Task
         val replay=t.start() !! SubmitTask(f)
         replaySet+=replay
      }
      while(replaySet.size>0){
         //檢查replaySet中是否有執行完Future,過濾出來
         val toCompute=replaySet.filter(_.isSet)
         for(r<-toCompute){
            //r.apply()等價於r()取出r物件
            val result=r.apply()
            //取出的物件進行強轉,放到resultList中
            resultList+=result.asInstanceOf[ResultTask]
            //操作完一個移除一個,避免重複
            replaySet.remove(r)//replaySet -=r
         }
         Thread.sleep(100)//睡一會避免死迴圈,等待所有任務執行完
      }
      //最終resultList中的資料是每個檔案處理好的esultTask(Map[String,Int])集合
      //此步驟類似於hadoop裡的reducer
      val finalResult=resultList.map(_.result)//變成List裡裝的很多map格式
                      .flatten.groupBy(_._1)//壓縮分組
                      .mapValues(x=>x.foldLeft(0)(_+_._2))//累加
      //列印結果
      println(finalResult)
   }
}

5、結果

Map(lisi -> 2, 蘇三 -> 1, lilei -> 2, hello -> 7, zhangsan -> 2)