1. 程式人生 > >Scala系列——Scala Actor併發程式設計

Scala系列——Scala Actor併發程式設計

一、基本介紹

1、 概念

        Scala中的Actor能夠實現並行程式設計的強大功能,它是基於事件模型的併發機制,Scala是運用訊息(message)的傳送、接收來實現多執行緒的。使用Scala能夠更容易地實現多執行緒應用的開發。

2、傳統java併發程式設計與Scala Actor程式設計的區別

        對於Java,我們都知道它的多執行緒實現需要對共享資源(變數、物件等)使用synchronized 關鍵字進行程式碼塊同步、物件鎖互斥等等。而且,常常一大塊的try…catch語句塊中加上wait方法、notify方法、notifyAll方法是讓人很頭疼的。原因就在於Java中多數使用的是可變狀態的物件資源,對這些資源進行共享來實現多執行緒程式設計的話,控制好資源競爭與防止物件狀態被意外修改是非常重要的,而物件狀態的不變性也是較難以保證的。 而在Scala中,我們可以通過複製不可變狀態的資源(即物件,Scala中一切都是物件,連函式、方法也是)的一個副本,再基於Actor的訊息傳送、接收機制進行並行程式設計

3、Actor方法執行順序

1.首先呼叫start()方法啟動Actor

2.呼叫start()方法後其act()方法會被執行

3.向Actor傳送訊息

4、傳送訊息的方式

! 傳送非同步訊息,沒有返回值。

!? 傳送同步訊息,等待返回值。

!! 傳送非同步訊息,返回值是 Future[Any]。

二、Actor實戰

1、例子一

package cn.itcast.actor
//注意導包是scala.actors.Actor
import scala.actors.Actor
​
object MyActor1 extends Actor{
  //重新act方法
  def act(){
    for(i <- 1 to 10){
      println("actor-1 " + i)
      Thread.sleep(2000)
    }
  }
}
​
object MyActor2 extends Actor{
  //重新act方法
  def act(){
    for(i <- 1 to 10){
      println("actor-2 " + i)
      Thread.sleep(2000)
    }
  }
}
​
object ActorTest extends App{
  //啟動Actor
  MyActor1.start()
  MyActor2.start()
}

說明:上面分別呼叫了兩個單例物件的start()方法,他們的act()方法會被執行,相同與在java中開啟了兩個執行緒,執行緒的run()方法會被執行

注意:這兩個Actor是並行執行的,act()方法中的for迴圈執行完成後actor程式就退出了

2、例子二(可以不斷地接收訊息)

class MyActor extends Actor {
​
  override def act(): Unit = {
    while (true) {
      receive {
        case "start" => {
          println("starting ...")
          Thread.sleep(5000)
          println("started")
        }
        case "stop" => {
          println("stopping ...")
          Thread.sleep(5000)
          println("stopped ...")
        }
      }
    }
  }
}
​
object MyActor {
  def main(args: Array[String]) {
    val actor = new MyActor
    actor.start()
    actor ! "start"
    actor ! "stop"
    println("訊息傳送完成!")
  }
}

說明:在act()方法中加入了while (true) 迴圈,就可以不停的接收訊息

注意:傳送start訊息和stop的訊息是非同步的,但是Actor接收到訊息執行的過程是同步的按順序執行

3、例子三(react方式會複用執行緒,比receive更高效)

class YourActor extends Actor {
​
  override def act(): Unit = {
    loop {//一個核心一個執行緒,如果一個核心多個執行緒,執行緒還是會切換
      react {
        case "start" => {
          println("starting ...")
          Thread.sleep(5000)
          println("started")
        }
        case "stop" => {
          println("stopping ...")
          Thread.sleep(8000)
          println("stopped ...")
        }
      }
    }
  }
}
​
​
object YourActor {
  def main(args: Array[String]) {
    val actor = new YourActor
    actor.start()
    actor ! "start"
    actor ! "stop"
    println("訊息傳送完成!")
  }
}

說明: react 如果要反覆執行訊息處理,react外層要用loop,不能用while

4、例子四(結合case class傳送訊息)

class AppleActor extends Actor {
​
  def act(): Unit = {
    while (true) {
      receive {
        case "start" => println("starting ...")
        case SyncMsg(id, msg) => {
          println(id + ",sync " + msg)
          Thread.sleep(5000)
          sender ! ReplyMsg(3,"finished")
        }
        case AsyncMsg(id, msg) => {
          println(id + ",async " + msg)
          Thread.sleep(5000)
        }
      }
    }
  }
}
​
object AppleActor {
  def main(args: Array[String]) {
    val a = new AppleActor
    a.start()
    //非同步訊息
    a ! AsyncMsg(1, "hello actor")
    println("非同步訊息傳送完成")
    //同步訊息
    //val content = a.!?(1000, SyncMsg(2, "hello actor"))
    //println(content)
    val reply = a !! SyncMsg(2, "hello actor")
    println(reply.isSet)
    //println("123")
    val c = reply.apply()
    println(reply.isSet)
    println(c)
  }
}
case class SyncMsg(id : Int, msg: String)
case class AsyncMsg(id : Int, msg: String)
case class ReplyMsg(id : Int, msg: String)

三、練習

        用actor併發程式設計寫一個單機版的WorldCount,將多個檔案作為輸入,計算完成後將多個任務彙總,得到最終的結果

class Task extends Actor {
​
  override def act(): Unit = {
    loop {
      react {
        case SubmitTask(fileName) => {
          val contents = Source.fromFile(new File(fileName)).mkString
          val arr = contents.split("\r\n")
          val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length)
          //val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
          sender ! ResultTask(result)
        }
        case StopTask => {
          exit()
        }
      }
    }
  }
}
​
object WorkCount {
  def main(args: Array[String]) {
    val files = Array("c://words.txt", "c://words.log")
​
    val replaySet = new mutable.HashSet[Future[Any]]
    val resultList = new mutable.ListBuffer[ResultTask]
​
    for(f <- files) {
      val t = new Task
      //執行緒執行完後,返回結果放到Future[Any],即返回結果型別為Future[Any]
      val replay = t.start() !! SubmitTask(f)
      replaySet += replay
    }
​
    while(replaySet.size > 0){
      val toCumpute = replaySet.filter(_.isSet)
      for(r <- toCumpute){
        val result = r.apply()//apply,把結果取出來
        resultList += result.asInstanceOf[ResultTask]
        replaySet.remove(r)
      }
      Thread.sleep(100)
    }
    val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2))
    println(finalResult)
  }
}
​
case class SubmitTask(fileName: String)
case object StopTask
case class ResultTask(result: Map[String, Int])

注:我們現在學的Scala Actor是scala 2.10.x版本及以前版本的Actor。Scala在2.11.x版本中將Akka加入其中,作為其預設的Actor,老版本的Actor已經廢棄[不可用]

相關推薦

Scala系列——Scala Actor併發程式設計

一、基本介紹 1、 概念         Scala中的Actor能夠實現並行程式設計的強大功能,它是基於事件模型的併發機制,Scala是運用訊息(message)的傳送、接收來實現多執行緒的。使用Scala能夠更容易地實現多執行緒應用的開發。 2、傳統java併發程式

Scala Actor併發程式設計

 Java中的併發程式設計主要通過執行緒實現的,通過共享資源的機制實現併發,但會面臨著死鎖的問題。在Scala中,是通過訊息傳遞來實現併發的,而Actor正是實現訊息傳遞的。 Scala的actor提供了一種基於事件的輕量級執行緒。只要使用scala.actors.Ac

scalaactor併發程式設計寫一個單機版的WorldCount(類似Hadoop的MapReduce思想)

1、準備資料,2個檔案 words.txt 內容: lilei hello zhangsan hello lisi hello 蘇三 hello words.log 內容: lilei hello zhangsan hello lisi hello 2、環境Intell

scala學習十一 併發程式設計

首先來一個生產者消費者的例子: Producer: class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array(

Actor併發程式設計模型淺析

一.Actor模型介紹 在單核 CPU 發展已經達到一個瓶頸的今天,要增加硬體的速度更多的是增加 CPU 核的數目。而針對這種情況,要使我們的程式執行效率提高,那麼也應該從併發方面入手。傳統的多執行緒方法又極其容易出現 Bug 而難以維護,不過別擔心,今天將要介紹另一種併發的模式能一定程度解決這些問題,那就是

Scala系列——Scala中_(下劃線)的常見用法

將方法轉換為函式 scala> def ml(x: Int,y:Int) : Int= x*y scala> val f1=m1 _ 作為函式的引數         一個匿名的函式傳遞給一個方法或者函式的時候,scala會盡量推斷出引數型別。例如一個完整的匿

江疏影讀書系列之Java併發程式設計實戰(第一章 簡介)

欲速則不達,欲達則欲速! 12年畢業,化工、零售。16年轉行,Java培訓五個月,17年1月,我人生中最悲慘的一個月,找工作處處

Scala Actor併發程式設計

1.什麼是Scala Actor 概念 Scala中的Actor能夠實現並行程式設計的強大功能,它是基於事件模型的併發機制,Scala是運用訊息(massage)的傳送,接收來實現多執行緒的,使用Scala能夠更能夠地實現多執行緒應用的開發。 2.1傳統java併發程式設計與Scala Ac

Scala使用Actor進行併發程式設計

Akka 是一個用 Scala 編寫的庫,用於簡化編寫容錯的、高可伸縮性的 Java 和 Scala 的 Actor 模型應用。  Actor模型並非什麼新鮮事物,它由Carl Hewitt於上世紀70年代早期提出,目的是為了解決分散式程式設計中一系列的程式設計問題。其

Scala系列學習七 Actor

Scala中的Actor能夠實現並行程式設計的強大功能,它是基於事件模型的併發機制,Scala是運用訊息(message)的傳送、接收來實現多執行緒的。使用Scala能夠更容易地實現多執行緒應用的開發。 1.首先呼叫start()方法啟動Actor 2.呼叫start()方

第69講 scala併發程式設計 react 、loop 程式設計

上一節我們講了,actor的receive 偏函式,它屬於 每請求沒執行緒模式,用完了就銷燬。 有沒有執行緒共享,請看本文。 scala 為了提升效能,有2種共享執行緒方式,一種是使用react ,另一種是 loop方法。 一、 react 方式 程式

Scala入門到精通——第二十六節 Scala併發程式設計基礎

本節主要內容 Scala併發程式設計簡介 Scala Actor併發程式設計模型 react模型 Actor的幾種狀態 Actor深入使用解析 1. Scala併發程式設計簡介 2003 年,Herb Sutter 在他的文章 “The Fre

白話scala系列Scala程式設計難點解析

一直想找一篇關於scala和其他語言相比難點分析的文章,今天終於找到一篇,雖然有點囉嗦,但仔細閱讀後還是會有所體會。 原文連結:http://www.blogjava.net/hechi158/archive/2012/02/28/370902.html S

學習spark系列---scala 程式設計基礎

變數 var getter and setter val getter 一旦初始化就不能賦值,鼓勵使用 資料型別 array array 不可變長度 arrayBuffer 可變長度, 互相轉化函式 toArray() 和 t

scala 併發程式設計

1.Actor的建立、啟動和訊息收發 //相當於java的thread //java多執行緒是共享全域性的加鎖的程式設計機制 /** Actor trait就類似於Java中的Thread和Runnable一樣, 是基礎的多執行緒基類和介面。我們只要重寫

scala併發程式設計第一章習題

1.下面的方法簽名實現一個compose方法 def compose[A,B,C](g:B => C ,f : A => B):A => C = x => g(f(x)) 思路就是上一篇文章說的關於compose和andThe

scalaactor併發統計詞頻

import scala.actors.{Actor, Future}import scala.collection.mutableimport scala.io.Sourcecase class MySend(file: String)case class MyRecieve(msg: Map[String

Scala併發程式設計模型AKKA

一、併發程式設計模型AKKA Spark使用底層通訊框架AKKA 分散式 master worker hadoop使用的是rpc 1)akka簡介 寫併發程式很難,AKKA解決spark這個問題。 akka構建在JVM平臺上,是

Scala併發程式設計基礎

轉載作者:搖擺少年夢轉載地址:https://blog.csdn.net/lovehuangjiaju/article/details/47623177本節主要內容Scala併發程式設計簡介Scala Actor併發程式設計模型react模型Actor的幾種狀態Actor深入

Scala學習--併發程式設計--socket

基礎知識 Runnable/Callable Runnable介面只有一個沒有返回值的方法。 trait Runnable { def run(): Unit } Callable與之類