基於Scala的Actor對字進行統計
阿新 • • 發佈:2019-01-09
基於Scala的Actor對字進行統計
用actor併發程式設計寫一個單機版的WorldCount,將多個檔案作為輸入,計算完成後將多個任務彙總,得到最終的結果
package com.zhiyou100.ScalaActor_akka import scala.actors.{Actor, Future} import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.io.Source class ActorWordCount extends Actor{ override def act(): Unit = { loop{ react{ case MapTask(filename) =>{ //map的業務邏輯 1.讀取檔案2. 單詞切割 3. (key,1) 4.combiner //區域性彙總:結果是Map[String,Int] val result = Source.fromFile(filename).getLines(). flatMap(_.split(" ")).map((_,1)).toList.groupBy(_._1).mapValues(_.size) //將Map傳送給reduce sender ! ReduceTask(result) } case ExistTask =>{ exit() } } } } } object ActorWordCount{ def main(args: Array[String]): Unit = { val resSet = new mutable.HashSet[Future[Any]]() val resultList=new ListBuffer[ReduceTask] val files=Array[String](xs="E:\\1\\2\\1\\MR.txt") for (filename<- files){ val actor=new ActorWordCount val res = actor.start() !! MapTask(filename) //啟動actor並向Map傳送訊息,返回值是一個Future resSet += res //把這些Future } while(resSet.size>0){ val toHandle= resSet.filter(_.isSet) //取出有效的結果,待處理的資料 for (f<- toHandle){ val result=f.apply() val result1 = result .asInstanceOf[ReduceTask] //獲取ReduceTask的例項 resultList += result1 //將有效的資料放到另一個容器中 resSet -=f //從futuer的集合中刪除用過的元素 } } println(resultList) val rd1 = resultList.flatMap(_.result).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2)) println(rd1) //resultlist:((hello,3),(hello,2),(tom,1)...) 此時是一個wordCount val r= resultList.flatMap(_.result).groupBy(_._1).mapValues(_.size) println(r.toBuffer) } } case class MapTask(filename:String) case class ReduceTask(result:Map[String,Int]) case object ExistTask //退出Task