scala用actor併發程式設計寫一個單機版的WorldCount(類似Hadoop的MapReduce思想)
阿新 • • 發佈:2019-01-29
1、準備資料,2個檔案
words.txt
內容:
lilei hello
zhangsan hello
lisi hello
蘇三 hello
words.log
內容:
lilei hello
zhangsan hello
lisi hello
2、環境Intellj IDEA scala外掛
3、程式碼
package p1 import scala.actors.{Actor, Future} import scala.collection.mutable import scala.io.Source //模式匹配類,用於提交任務 case class SubmitTask(fileName:String) //單例的模式匹配類,用於停止任務 case object StopTask //用於收集分組後結果的 case class ResultTask(result:Map[String,Int]) /** * scala Actor構建在java的執行緒基礎之上的, * 為了避免頻繁的執行緒建立、銷燬和切換等,scala中提供了react方法 * 方法執行完畢後,仍然被保留 */ class Task extends Actor{ override def act(){ loop{//重複執行一個程式碼塊 react{ case SubmitTask(fileName)=>{ val result=Source.fromFile(fileName,"gb2312").getLines()//獲取檔案,有中文-編碼,每一行生成一個List集合 .flatMap(_.split(" ")).map((_,1)).toList//把上面所有集合壓縮成一個集合,再切分,再生成map-(“單詞”,1) .groupBy(_._1).mapValues(_.size)//按照key分組,value就是分組後map的數量 sender ! ResultTask(result)//把單個檔案的統計結果輸出,!代表非同步執行 } case StopTask=>{ exit() } } } } } object WorkCount{ def main(args: Array[String]) { //要讀取的檔案 val files=Array("E://words.txt","E://words.log") val replaySet=new mutable.HashSet[Future[Any]] val resultList=new mutable.ListBuffer[ResultTask] //每個檔案啟動一個執行緒,非同步提交,replaySet接收返回的值 for(f<-files){ val t=new Task val replay=t.start() !! SubmitTask(f) replaySet+=replay } while(replaySet.size>0){ //檢查replaySet中是否有執行完Future,過濾出來 val toCompute=replaySet.filter(_.isSet) for(r<-toCompute){ //r.apply()等價於r()取出r物件 val result=r.apply() //取出的物件進行強轉,放到resultList中 resultList+=result.asInstanceOf[ResultTask] //操作完一個移除一個,避免重複 replaySet.remove(r)//replaySet -=r } Thread.sleep(100)//睡一會避免死迴圈,等待所有任務執行完 } //最終resultList中的資料是每個檔案處理好的esultTask(Map[String,Int])集合 //此步驟類似於hadoop裡的reducer val finalResult=resultList.map(_.result)//變成List裡裝的很多map格式 .flatten.groupBy(_._1)//壓縮分組 .mapValues(x=>x.foldLeft(0)(_+_._2))//累加 //列印結果 println(finalResult) } }
5、結果
Map(lisi -> 2, 蘇三 -> 1, lilei -> 2, hello -> 7, zhangsan -> 2)