1. 程式人生 > >Scala併發程式設計基礎

Scala併發程式設計基礎

轉載作者:搖擺少年夢

轉載地址:https://blog.csdn.net/lovehuangjiaju/article/details/47623177

本節主要內容

  1. Scala併發程式設計簡介
  2. Scala Actor併發程式設計模型
  3. react模型
  4. Actor的幾種狀態
  5. Actor深入使用解析

1. Scala併發程式設計簡介

2003 年,Herb Sutter 在他的文章 “The Free Lunch Is Over” 中揭露了行業中最不可告人的一個小祕密,他明確論證了處理器在速度上的發展已經走到了盡頭,並且將由全新的單晶片上的並行 “核心”(虛擬 CPU)所取代。這一發現對程式設計社群造成了不小的衝擊,因為正確建立執行緒安全的程式碼,在理論而非實踐中,始終會提高高效能開發人員的身價,而讓各公司難以聘用他們。看上去,僅有少數人充分理解了 Java 的執行緒模型、併發 API 以及 “同步” 的含義,以便能夠編寫同時提供安全性和吞吐量的程式碼 —— 並且大多數人已經明白了它的困難所在(來源:

http://www.ibm.com/developerworks/cn/java/j-scala02049.html)。

在Java中,要編寫一個執行緒安全的程式並不是一件易事,例如:

class Account {  
    private int balance;  

    synchronized public int getBalance() {  
      return balance;  
    }  

    synchronized public void incrementBalance() {  
      balance++;  
    }  
}  

上面這段java程式碼雖然方法前面加了synchronized ,但它仍然不是執行緒安全的,例如,在執行下面兩個語句

account.incrementBalance();  
account.getBalance();

時,有可能account.incrementBalance()執行完成後,其它執行緒可能會獲取物件的鎖,修改account的balance,從而造成得不到預期結果的問題。解決問題的方法是將兩個功能結合起來形成一個方法:

synchronized public int incrementAndGetBalance() {  
  balance++;  
  return balance;  
}  

但這可能並不是我們想要的,每次獲取balance都要將balance增加, 這顯然與實際不符。除此之外,java中的併發程式設計可能還會經常遇到死鎖問題,而這個問題往往難除錯,問題可能會隨機性的出現。總體上來看,java的併發程式設計模型相對較複雜,難以駕馭。

Scala很好地解決了java併發程式設計的問題,要在scala中進行併發程式設計,有以下幾種途徑可以實現: 
1 actor訊息模型、akka actor併發模型。

2 Thread、Runnable

3 java.util.concurennt

4 第三方開源併發框架如Netty,Mina

在上述四種途徑當中,利用 actor訊息模型、akka actor併發模型是scala併發程式設計的首先,本節主要介紹actor訊息模型,akka actor併發模型我們將放在後面的章節中介紹。 
在scala中,通過不變物件來實現執行緒安全,涉及到修改物件狀態時,則建立一個新的物件來實現,如:

//成員balance狀態一旦被賦值,便不能更改
//因而它也是執行緒安全的
class Person(val age: Integer) {  
  def getAge() = age
}  

object Person{  
  //建立新的物件來實現物件狀態修改
  def increment(person: Person): Person{  
    new Person(Person.getAge() + 1)  
  }  
}  

通過不變物件實現併發程式設計,可以簡化程式設計模型,使併發程式更容易現實和控制。

2.Scala Actor併發程式設計模型

java中的併發主要是通過執行緒來實現,各執行緒採用共享資源的機制來實現程式的併發,這裡面臨競爭資源的問題,雖然採用鎖機制可以避免競爭資源的問題,但會存在死鎖問題,要開發一套健壯的併發應用程式具有一定的難度。而scala的併發模型相比於java它更簡單,它採用訊息傳遞而非資源共享來實現程式的併發,訊息傳遞正是通過Actor來實現的。下面的程式碼給出了Actor使用示例

//混入Actor特質,然後實現act方法
//如同java中的Runnable介面一樣
//各執行緒的run方法是併發執行的
//Actor中的act方法也是併發執行的
class ActorDemo extends Actor{
  //實現 act()方法
  def act(){
    while(true){
      //receive從郵箱中獲取一條訊息
      //然後傳遞給它的引數
      //該引數是一個偏函式
      receive{
        case "actorDemo" => println("receive....ActorDemo")
      }      
    }
  }
}
object ActorDemo extends App{
  val actor=new ActorDemo
  //啟動建立的actor 
  actor.start()
  //主執行緒傳送訊息給actor
  actor!"actorDemo"
}

下面給的是recieve方法的部分原始碼

def receive[R](f: PartialFunction[Any, R]): R = {
    assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")

    synchronized {
      if (shouldExit) exit() // links
      drainSendBuffer(mailbox)
    }

    var done = false
    while (!done) {
      val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
        senders = replyTo :: senders
        val matches = f.isDefinedAt(m)
        senders = senders.tail
        matches
      })
................

從上述程式碼中不能看出,receive方法接受的引數是一個偏函式,並且是通過mailbox來實現訊息的傳送與接收。

在前述的class ActorDemo中,receive方法的引數為

{
        case "actorDemo" => println("receive....ActorDemo")
}      

該程式碼塊在執行時被轉換為一個PartialFunction[Any, R]的偏函式,其中R是偏函式的返回型別,對應case 語句=> 右邊的部分,在本例子中R是Unit型別,而Any對應的則對應case語句的模式部分。

前面給的是通過extends Actor的方式來建立一個Actor類,其實scala.actors.Actor中提供了一個actor工具方法,可以非常方便地直接建立Actor物件如:

import scala.actors.Actor._

object ActorFromMethod extends App{
  //通過工具方法actor直接建立Actor物件
  val methodActor = actor {
    for (i <- 1 to 5)
      println("That is the question.")
      Thread.sleep(1000)
  }
}

上述程式碼建立的actor物件無需呼叫start方法,物件建立完成後會立即執行。

scala中本地執行緒也可用作Actor,下面的程式碼演示瞭如何在REPL命令列中將本地執行緒當作Actor;

scala> import scala.actors.Actor._
import scala.actors.Actor._

//self引用本地執行緒,併發送訊息
scala> self ! "hello"
//接收訊息
scala> self.receive { case x:String => x }
res1: String = hello

上述程式碼中,如果傳送的訊息不是String型別的,執行緒將被阻塞,為避免這個問題,可以採用receiveWithin方法,

scala> self ! 123

scala> self.receiveWithin(1000) { case x => x }
res6: Any = 123

scala> self.receiveWithin(1000) { case x => x }
res7: Any = TIMEOUT
  • react模型

這裡給出另外一個作者的react介紹,更通俗易懂:react

scala中的Actor也是構建在java執行緒基礎之上的,前面在使用Actor時都是通過建立Actor物件,然後再呼叫act方法來啟動actor。我們知道,java中執行緒的建立、銷燬及執行緒間的切換是比較耗時的,因此實際中儘量避免頻繁的執行緒建立、銷燬和銷燬。Scala中提供react方法,在方法執行結束後,執行緒仍然被保留。下面的程式碼演示了react方法的使用:

package cn.scala.xtwy.concurrency
import scala.actors._

object NameResolver extends Actor {
  import java.net.{ InetAddress, UnknownHostException }
  def act() {
    react {
      //匹配主執行緒發來的("www.scala-lang.org", NameResolver)
      case (name: String, actor: Actor) =>
        //向actor傳送解析後的IP地址資訊
        //由於本例中傳進來的actor就是NameResolver自身
        //也即自己給自己傳送訊息,並存入將訊息存入郵箱
        actor ! getIp(name)
        //再次呼叫act方法,試圖從郵箱中提取資訊
        //如果郵箱中資訊為空,則進入等待模式
        act()
      case "EXIT" =>
        println("Name resolver exiting.")
      // quit
      //匹配郵箱中的單個資訊,本例中會匹配郵箱中的IP地址資訊
      case msg =>
        println("Unhandled message: " + msg)
        act()
    }
  }
  def getIp(name: String): Option[InetAddress] = {
    try {
      Some(InetAddress.getByName(name))
    } catch {
      case _: UnknownHostException => None
    }
  }
}
object Main extends App{
  NameResolver.start()
  //主執行緒向NameResolver傳送訊息("www.scala-lang.org", NameResolver)
  NameResolver ! ("www.scala-lang.org", NameResolver)
  NameResolver ! ("wwwwww.scala-lang.org", NameResolver)

}

從上述程式碼中可以看到,通過在react方法執行結束時加入act方法,方法執行完成後沒有被銷燬,而是繼續試圖從郵箱中獲取資訊,獲取不到則等待。

4. Actor的幾種狀態

Actor有下列幾種狀態:

  • 初始狀態(New),Actor物件被建立,但還沒有啟動即沒有執行start方法時的狀態
  • 執行狀態(Runnable),正在執行時的狀態
  • 掛起狀態(Suspended),在react方法中等待時的狀態
  • 時間點掛起狀態(TimedSuspended),掛起狀態的一種特殊形式,reactWithin方法中的等待時的狀態
  • 阻塞狀態(Blocked),在receive方法中阻塞等待時的狀態
  • 時間點阻塞狀態(TimedBlocked),在receiveWithin方法中阻塞等待時的狀態
  • 結束狀態(Terminated),執行完成後被銷燬

5. Actor深入使用解析

1 receive方法單次執行:


object Actor2
  {
    case class Speak(line : String)
    case class Gesture(bodyPart : String, action : String)
    case class NegotiateNewContract()

    def main(args : Array[String]) =
    {
      val badActor =
        actor
        {
          //這裡receive方法只會匹配一次便結束
          receive
          {
            case NegotiateNewContract =>
              System.out.println("I won't do it for less than $1 million!")
            case Speak(line) =>
              System.out.println(line)
            case Gesture(bodyPart, action) =>
              System.out.println("(" + action + "s " + bodyPart + ")")
            case _ =>
              System.out.println("Huh? I'll be in my trailer.")
          }
        }
      //receive方法只處理下面這條語句傳送的訊息
      badActor ! NegotiateNewContract
      //下面其餘的訊息不會被處理
      badActor ! Speak("Do ya feel lucky, punk?")
      badActor ! Gesture("face", "grimaces")
      badActor ! Speak("Well, do ya?")
    }
  }

上述程式碼只會輸出: 
I won’t do it for less than $1 million! 
即後面傳送的訊息如: 
badActor ! Speak(“Do ya feel lucky, punk?”) 
badActor ! Gesture(“face”, “grimaces”) 
badActor ! Speak(“Well, do ya?”) 
不會被處理。這是因為receive方法的單次執行問題。

2 能夠處理多個訊息的receive方法:

object Actor2
  {
    case class Speak(line : String);
    case class Gesture(bodyPart : String, action : String);
    case class NegotiateNewContract()
    //處理結束訊息
    case class ThatsAWrap()

    def main(args : Array[String]) =
    {
      val badActor =
        actor
        {
          var done = false
          //while迴圈
          while (! done)
          {
            receive
            {
              case NegotiateNewContract =>
                System.out.println("I won't do it for less than $1 million!")
              case Speak(line) =>
                System.out.println(line)
              case Gesture(bodyPart, action) =>
                System.out.println("(" + action + "s " + bodyPart + ")")
              case ThatsAWrap =>
                System.out.println("Great cast party, everybody! See ya!")
                done = true
              case _ =>
                System.out.println("Huh? I'll be in my trailer.")
            }
          }
        }
      //下面所有的訊息都能被處理
      badActor ! NegotiateNewContract
      badActor ! Speak("Do ya feel lucky, punk?")
      badActor ! Gesture("face", "grimaces")
      badActor ! Speak("Well, do ya?")
      //訊息傳送後,receive方法執行完畢
      badActor ! ThatsAWrap
    }
  }

3 Actor後面實現原理仍然是執行緒的證據

object Actor3
  {
    case class Speak(line : String);
    case class Gesture(bodyPart : String, action : String);
    case class NegotiateNewContract;
    case class ThatsAWrap;

    def main(args : Array[String]) =
    {
      def ct =
        "Thread " + Thread.currentThread().getName() + ": "
      val badActor =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case NegotiateNewContract =>
                System.out.println(ct + "I won't do it for less than $1 million!")
              case Speak(line) =>
                System.out.println(ct + line)
              case Gesture(bodyPart, action) =>
                System.out.println(ct + "(" + action + "s " + bodyPart + ")")
              case ThatsAWrap =>
                System.out.println(ct + "Great cast party, everybody! See ya!")
                done = true
              case _ =>
                System.out.println(ct + "Huh? I'll be in my trailer.")
            }
          }
        }

      System.out.println(ct + "Negotiating...")
      badActor ! NegotiateNewContract
      System.out.println(ct + "Speaking...")
      badActor ! Speak("Do ya feel lucky, punk?")
      System.out.println(ct + "Gesturing...")
      badActor ! Gesture("face", "grimaces")
      System.out.println(ct + "Speaking again...")
      badActor ! Speak("Well, do ya?")
      System.out.println(ct + "Wrapping up")
      badActor ! ThatsAWrap
    }
  }

執行結果如下:

Thread main: Negotiating...
Thread main: Speaking...
Thread main: Gesturing...
Thread main: Speaking again...
Thread main: Wrapping up
Thread ForkJoinPool-1-worker-13: I won't do it for less than $1 million!
Thread ForkJoinPool-1-worker-13: Do ya feel lucky, punk?
Thread ForkJoinPool-1-worker-13: (grimacess face)
Thread ForkJoinPool-1-worker-13: Well, do ya?
Thread ForkJoinPool-1-worker-13: Great cast party, everybody! See ya!

從上述執行結果可以看到,Actor最終的實現仍然是執行緒,只不過它提供的程式設計模型與java中的程式設計模型不一樣而已。

4 利用!?傳送同步訊息,等待返回值


import scala.actors._,Actor._


object ProdConSample2
  {
    case class Message(msg : String)

    def main(args : Array[String]) : Unit =
    {
      val consumer =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case msg =>
                System.out.println("Received message! -> " + msg)
                done = (msg == "DONE")
                reply("Already RECEIVED....."+msg)
            }
          }
        }

      System.out.println("Sending....")
      //獲取響應值
      val r= consumer !? "Mares eat oats"
      println("replyed message"+r)
      System.out.println("Sending....")
      consumer !? "Does eat oats"
      System.out.println("Sending....")
      consumer !? "Little lambs eat ivy"
      System.out.println("Sending....")
      consumer !? "Kids eat ivy too"
      System.out.println("Sending....")
      consumer !? "DONE"      
    }
  }

程式碼執行結果:

Sending....
Received message! -> Mares eat oats
replyed messageAlready RECEIVED.....Mares eat oats
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Sending....
Received message! -> Kids eat ivy too
Sending....
Received message! -> DONE

通過上述程式碼執行結果可以看到,!?因為是同步訊息,傳送完返回結果後才會接著傳送下一條訊息。

5 Spawn方法傳送訊息

object ProdConSampleUsingSpawn
  {
    import concurrent.ops._

    def main(args : Array[String]) : Unit =
    {
      // Spawn Consumer
      val consumer =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case msg =>
                System.out.println("MESSAGE RECEIVED: " + msg)
                done = (msg == "DONE")
                reply("RECEIVED")
            }
          }
        }

      // Spawn Producer
      spawn  //spawn是一個定義在current.ops中的方法
      {
        val importantInfo : Array[String] = Array(
          "Mares eat oats",
          "Does eat oats",
          "Little lambs eat ivy",
          "A kid will eat ivy too",
          "DONE"
        );

        importantInfo.foreach((msg) => consumer !? msg)
      }
    }
  }

6 !! 傳送非同步訊息,返回值是 Future[Any]

object ProdConSample3
  {
    case class Message(msg : String)

    def main(args : Array[String]) : Unit =
    {
      val consumer =
        actor
        {
          var done = false
          while (! done)
          {
            receive
            {
              case msg =>
                System.out.println("Received message! -> " + msg)
                done = (msg == "DONE")
                reply("Already RECEIVED....."+msg)
            }
          }
        }

      System.out.println("Sending....")
      //傳送非同步訊息,返回
      val replyFuture= consumer !! "Mares eat oats"
      val r=replyFuture()
      println("replyed message*****"+r)
      System.out.println("Sending....")
      consumer !! "Does eat oats"
      System.out.println("Sending....")
      consumer !! "Little lambs eat ivy"
      System.out.println("Sending....")
      consumer !! "Kids eat ivy too"
      System.out.println("Sending....")
      consumer !! "DONE"      
    }
  }

執行結果:

Sending....
Received message! -> Mares eat oats
replyed message*****Already RECEIVED.....Mares eat oats
Sending....
Sending....
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Received message! -> Kids eat ivy too
Received message! -> DONE

通過上述程式碼的執行結果可以看到,!!的訊息傳送是非同步的,訊息傳送後無需等待結果返回便執行下一條語句,但如果要獲取非同步訊息的返回值,如:

 val replyFuture= consumer !! "Mares eat oats"
      val r=replyFuture()

則執行到這兩條語句的時候,程式先被阻塞,等獲得結果之後再發送其它的非同步訊息。

7 loop方法實現react

object LoopReact extends App{
  val a1 = Actor.actor {
    //注意這裡loop是一個方法,不是關鍵字
    //實現型別while迴圈的作用
    loop {
      react {
        //為整型時結束操作
        case x: Int=>println("a1 stop: " + x); exit()
        case msg: String => println("a1: " + msg)
      }
    }
  }

  a1!("我是搖擺少年夢")
  a1.!(23)

}

相關推薦

no