Scala併發程式設計基礎
轉載作者:搖擺少年夢
轉載地址:https://blog.csdn.net/lovehuangjiaju/article/details/47623177
本節主要內容
- Scala併發程式設計簡介
- Scala Actor併發程式設計模型
- react模型
- Actor的幾種狀態
- Actor深入使用解析
1. Scala併發程式設計簡介
2003 年,Herb Sutter 在他的文章 “The Free Lunch Is Over” 中揭露了行業中最不可告人的一個小祕密,他明確論證了處理器在速度上的發展已經走到了盡頭,並且將由全新的單晶片上的並行 “核心”(虛擬 CPU)所取代。這一發現對程式設計社群造成了不小的衝擊,因為正確建立執行緒安全的程式碼,在理論而非實踐中,始終會提高高效能開發人員的身價,而讓各公司難以聘用他們。看上去,僅有少數人充分理解了 Java 的執行緒模型、併發 API 以及 “同步” 的含義,以便能夠編寫同時提供安全性和吞吐量的程式碼 —— 並且大多數人已經明白了它的困難所在(來源:
在Java中,要編寫一個執行緒安全的程式並不是一件易事,例如:
class Account {
private int balance;
synchronized public int getBalance() {
return balance;
}
synchronized public void incrementBalance() {
balance++;
}
}
上面這段java程式碼雖然方法前面加了synchronized ,但它仍然不是執行緒安全的,例如,在執行下面兩個語句
account.incrementBalance();
account.getBalance();
時,有可能account.incrementBalance()執行完成後,其它執行緒可能會獲取物件的鎖,修改account的balance,從而造成得不到預期結果的問題。解決問題的方法是將兩個功能結合起來形成一個方法:
synchronized public int incrementAndGetBalance() {
balance++;
return balance;
}
但這可能並不是我們想要的,每次獲取balance都要將balance增加, 這顯然與實際不符。除此之外,java中的併發程式設計可能還會經常遇到死鎖問題,而這個問題往往難除錯,問題可能會隨機性的出現。總體上來看,java的併發程式設計模型相對較複雜,難以駕馭。
Scala很好地解決了java併發程式設計的問題,要在scala中進行併發程式設計,有以下幾種途徑可以實現:
1 actor訊息模型、akka actor併發模型。
2 Thread、Runnable
3 java.util.concurennt
4 第三方開源併發框架如Netty,Mina
在上述四種途徑當中,利用 actor訊息模型、akka actor併發模型是scala併發程式設計的首先,本節主要介紹actor訊息模型,akka actor併發模型我們將放在後面的章節中介紹。
在scala中,通過不變物件來實現執行緒安全,涉及到修改物件狀態時,則建立一個新的物件來實現,如:
//成員balance狀態一旦被賦值,便不能更改
//因而它也是執行緒安全的
class Person(val age: Integer) {
def getAge() = age
}
object Person{
//建立新的物件來實現物件狀態修改
def increment(person: Person): Person{
new Person(Person.getAge() + 1)
}
}
通過不變物件實現併發程式設計,可以簡化程式設計模型,使併發程式更容易現實和控制。
2.Scala Actor併發程式設計模型
java中的併發主要是通過執行緒來實現,各執行緒採用共享資源的機制來實現程式的併發,這裡面臨競爭資源的問題,雖然採用鎖機制可以避免競爭資源的問題,但會存在死鎖問題,要開發一套健壯的併發應用程式具有一定的難度。而scala的併發模型相比於java它更簡單,它採用訊息傳遞而非資源共享來實現程式的併發,訊息傳遞正是通過Actor來實現的。下面的程式碼給出了Actor使用示例
//混入Actor特質,然後實現act方法
//如同java中的Runnable介面一樣
//各執行緒的run方法是併發執行的
//Actor中的act方法也是併發執行的
class ActorDemo extends Actor{
//實現 act()方法
def act(){
while(true){
//receive從郵箱中獲取一條訊息
//然後傳遞給它的引數
//該引數是一個偏函式
receive{
case "actorDemo" => println("receive....ActorDemo")
}
}
}
}
object ActorDemo extends App{
val actor=new ActorDemo
//啟動建立的actor
actor.start()
//主執行緒傳送訊息給actor
actor!"actorDemo"
}
下面給的是recieve方法的部分原始碼
def receive[R](f: PartialFunction[Any, R]): R = {
assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
synchronized {
if (shouldExit) exit() // links
drainSendBuffer(mailbox)
}
var done = false
while (!done) {
val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
senders = replyTo :: senders
val matches = f.isDefinedAt(m)
senders = senders.tail
matches
})
................
從上述程式碼中不能看出,receive方法接受的引數是一個偏函式,並且是通過mailbox來實現訊息的傳送與接收。
在前述的class ActorDemo中,receive方法的引數為
{
case "actorDemo" => println("receive....ActorDemo")
}
該程式碼塊在執行時被轉換為一個PartialFunction[Any, R]的偏函式,其中R是偏函式的返回型別,對應case 語句=> 右邊的部分,在本例子中R是Unit型別,而Any對應的則對應case語句的模式部分。
前面給的是通過extends Actor的方式來建立一個Actor類,其實scala.actors.Actor中提供了一個actor工具方法,可以非常方便地直接建立Actor物件如:
import scala.actors.Actor._
object ActorFromMethod extends App{
//通過工具方法actor直接建立Actor物件
val methodActor = actor {
for (i <- 1 to 5)
println("That is the question.")
Thread.sleep(1000)
}
}
上述程式碼建立的actor物件無需呼叫start方法,物件建立完成後會立即執行。
scala中本地執行緒也可用作Actor,下面的程式碼演示瞭如何在REPL命令列中將本地執行緒當作Actor;
scala> import scala.actors.Actor._
import scala.actors.Actor._
//self引用本地執行緒,併發送訊息
scala> self ! "hello"
//接收訊息
scala> self.receive { case x:String => x }
res1: String = hello
上述程式碼中,如果傳送的訊息不是String型別的,執行緒將被阻塞,為避免這個問題,可以採用receiveWithin方法,
scala> self ! 123
scala> self.receiveWithin(1000) { case x => x }
res6: Any = 123
scala> self.receiveWithin(1000) { case x => x }
res7: Any = TIMEOUT
- react模型
這裡給出另外一個作者的react介紹,更通俗易懂:react
scala中的Actor也是構建在java執行緒基礎之上的,前面在使用Actor時都是通過建立Actor物件,然後再呼叫act方法來啟動actor。我們知道,java中執行緒的建立、銷燬及執行緒間的切換是比較耗時的,因此實際中儘量避免頻繁的執行緒建立、銷燬和銷燬。Scala中提供react方法,在方法執行結束後,執行緒仍然被保留。下面的程式碼演示了react方法的使用:
package cn.scala.xtwy.concurrency
import scala.actors._
object NameResolver extends Actor {
import java.net.{ InetAddress, UnknownHostException }
def act() {
react {
//匹配主執行緒發來的("www.scala-lang.org", NameResolver)
case (name: String, actor: Actor) =>
//向actor傳送解析後的IP地址資訊
//由於本例中傳進來的actor就是NameResolver自身
//也即自己給自己傳送訊息,並存入將訊息存入郵箱
actor ! getIp(name)
//再次呼叫act方法,試圖從郵箱中提取資訊
//如果郵箱中資訊為空,則進入等待模式
act()
case "EXIT" =>
println("Name resolver exiting.")
// quit
//匹配郵箱中的單個資訊,本例中會匹配郵箱中的IP地址資訊
case msg =>
println("Unhandled message: " + msg)
act()
}
}
def getIp(name: String): Option[InetAddress] = {
try {
Some(InetAddress.getByName(name))
} catch {
case _: UnknownHostException => None
}
}
}
object Main extends App{
NameResolver.start()
//主執行緒向NameResolver傳送訊息("www.scala-lang.org", NameResolver)
NameResolver ! ("www.scala-lang.org", NameResolver)
NameResolver ! ("wwwwww.scala-lang.org", NameResolver)
}
從上述程式碼中可以看到,通過在react方法執行結束時加入act方法,方法執行完成後沒有被銷燬,而是繼續試圖從郵箱中獲取資訊,獲取不到則等待。
4. Actor的幾種狀態
Actor有下列幾種狀態:
- 初始狀態(New),Actor物件被建立,但還沒有啟動即沒有執行start方法時的狀態
- 執行狀態(Runnable),正在執行時的狀態
- 掛起狀態(Suspended),在react方法中等待時的狀態
- 時間點掛起狀態(TimedSuspended),掛起狀態的一種特殊形式,reactWithin方法中的等待時的狀態
- 阻塞狀態(Blocked),在receive方法中阻塞等待時的狀態
- 時間點阻塞狀態(TimedBlocked),在receiveWithin方法中阻塞等待時的狀態
- 結束狀態(Terminated),執行完成後被銷燬
5. Actor深入使用解析
1 receive方法單次執行:
object Actor2
{
case class Speak(line : String)
case class Gesture(bodyPart : String, action : String)
case class NegotiateNewContract()
def main(args : Array[String]) =
{
val badActor =
actor
{
//這裡receive方法只會匹配一次便結束
receive
{
case NegotiateNewContract =>
System.out.println("I won't do it for less than $1 million!")
case Speak(line) =>
System.out.println(line)
case Gesture(bodyPart, action) =>
System.out.println("(" + action + "s " + bodyPart + ")")
case _ =>
System.out.println("Huh? I'll be in my trailer.")
}
}
//receive方法只處理下面這條語句傳送的訊息
badActor ! NegotiateNewContract
//下面其餘的訊息不會被處理
badActor ! Speak("Do ya feel lucky, punk?")
badActor ! Gesture("face", "grimaces")
badActor ! Speak("Well, do ya?")
}
}
上述程式碼只會輸出:
I won’t do it for less than $1 million!
即後面傳送的訊息如:
badActor ! Speak(“Do ya feel lucky, punk?”)
badActor ! Gesture(“face”, “grimaces”)
badActor ! Speak(“Well, do ya?”)
不會被處理。這是因為receive方法的單次執行問題。
2 能夠處理多個訊息的receive方法:
object Actor2
{
case class Speak(line : String);
case class Gesture(bodyPart : String, action : String);
case class NegotiateNewContract()
//處理結束訊息
case class ThatsAWrap()
def main(args : Array[String]) =
{
val badActor =
actor
{
var done = false
//while迴圈
while (! done)
{
receive
{
case NegotiateNewContract =>
System.out.println("I won't do it for less than $1 million!")
case Speak(line) =>
System.out.println(line)
case Gesture(bodyPart, action) =>
System.out.println("(" + action + "s " + bodyPart + ")")
case ThatsAWrap =>
System.out.println("Great cast party, everybody! See ya!")
done = true
case _ =>
System.out.println("Huh? I'll be in my trailer.")
}
}
}
//下面所有的訊息都能被處理
badActor ! NegotiateNewContract
badActor ! Speak("Do ya feel lucky, punk?")
badActor ! Gesture("face", "grimaces")
badActor ! Speak("Well, do ya?")
//訊息傳送後,receive方法執行完畢
badActor ! ThatsAWrap
}
}
3 Actor後面實現原理仍然是執行緒的證據
object Actor3
{
case class Speak(line : String);
case class Gesture(bodyPart : String, action : String);
case class NegotiateNewContract;
case class ThatsAWrap;
def main(args : Array[String]) =
{
def ct =
"Thread " + Thread.currentThread().getName() + ": "
val badActor =
actor
{
var done = false
while (! done)
{
receive
{
case NegotiateNewContract =>
System.out.println(ct + "I won't do it for less than $1 million!")
case Speak(line) =>
System.out.println(ct + line)
case Gesture(bodyPart, action) =>
System.out.println(ct + "(" + action + "s " + bodyPart + ")")
case ThatsAWrap =>
System.out.println(ct + "Great cast party, everybody! See ya!")
done = true
case _ =>
System.out.println(ct + "Huh? I'll be in my trailer.")
}
}
}
System.out.println(ct + "Negotiating...")
badActor ! NegotiateNewContract
System.out.println(ct + "Speaking...")
badActor ! Speak("Do ya feel lucky, punk?")
System.out.println(ct + "Gesturing...")
badActor ! Gesture("face", "grimaces")
System.out.println(ct + "Speaking again...")
badActor ! Speak("Well, do ya?")
System.out.println(ct + "Wrapping up")
badActor ! ThatsAWrap
}
}
執行結果如下:
Thread main: Negotiating...
Thread main: Speaking...
Thread main: Gesturing...
Thread main: Speaking again...
Thread main: Wrapping up
Thread ForkJoinPool-1-worker-13: I won't do it for less than $1 million!
Thread ForkJoinPool-1-worker-13: Do ya feel lucky, punk?
Thread ForkJoinPool-1-worker-13: (grimacess face)
Thread ForkJoinPool-1-worker-13: Well, do ya?
Thread ForkJoinPool-1-worker-13: Great cast party, everybody! See ya!
從上述執行結果可以看到,Actor最終的實現仍然是執行緒,只不過它提供的程式設計模型與java中的程式設計模型不一樣而已。
4 利用!?傳送同步訊息,等待返回值
import scala.actors._,Actor._
object ProdConSample2
{
case class Message(msg : String)
def main(args : Array[String]) : Unit =
{
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("Received message! -> " + msg)
done = (msg == "DONE")
reply("Already RECEIVED....."+msg)
}
}
}
System.out.println("Sending....")
//獲取響應值
val r= consumer !? "Mares eat oats"
println("replyed message"+r)
System.out.println("Sending....")
consumer !? "Does eat oats"
System.out.println("Sending....")
consumer !? "Little lambs eat ivy"
System.out.println("Sending....")
consumer !? "Kids eat ivy too"
System.out.println("Sending....")
consumer !? "DONE"
}
}
程式碼執行結果:
Sending....
Received message! -> Mares eat oats
replyed messageAlready RECEIVED.....Mares eat oats
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Sending....
Received message! -> Kids eat ivy too
Sending....
Received message! -> DONE
通過上述程式碼執行結果可以看到,!?因為是同步訊息,傳送完返回結果後才會接著傳送下一條訊息。
5 Spawn方法傳送訊息
object ProdConSampleUsingSpawn
{
import concurrent.ops._
def main(args : Array[String]) : Unit =
{
// Spawn Consumer
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("MESSAGE RECEIVED: " + msg)
done = (msg == "DONE")
reply("RECEIVED")
}
}
}
// Spawn Producer
spawn //spawn是一個定義在current.ops中的方法
{
val importantInfo : Array[String] = Array(
"Mares eat oats",
"Does eat oats",
"Little lambs eat ivy",
"A kid will eat ivy too",
"DONE"
);
importantInfo.foreach((msg) => consumer !? msg)
}
}
}
6 !! 傳送非同步訊息,返回值是 Future[Any]
object ProdConSample3
{
case class Message(msg : String)
def main(args : Array[String]) : Unit =
{
val consumer =
actor
{
var done = false
while (! done)
{
receive
{
case msg =>
System.out.println("Received message! -> " + msg)
done = (msg == "DONE")
reply("Already RECEIVED....."+msg)
}
}
}
System.out.println("Sending....")
//傳送非同步訊息,返回
val replyFuture= consumer !! "Mares eat oats"
val r=replyFuture()
println("replyed message*****"+r)
System.out.println("Sending....")
consumer !! "Does eat oats"
System.out.println("Sending....")
consumer !! "Little lambs eat ivy"
System.out.println("Sending....")
consumer !! "Kids eat ivy too"
System.out.println("Sending....")
consumer !! "DONE"
}
}
執行結果:
Sending....
Received message! -> Mares eat oats
replyed message*****Already RECEIVED.....Mares eat oats
Sending....
Sending....
Sending....
Received message! -> Does eat oats
Sending....
Received message! -> Little lambs eat ivy
Received message! -> Kids eat ivy too
Received message! -> DONE
通過上述程式碼的執行結果可以看到,!!的訊息傳送是非同步的,訊息傳送後無需等待結果返回便執行下一條語句,但如果要獲取非同步訊息的返回值,如:
val replyFuture= consumer !! "Mares eat oats"
val r=replyFuture()
則執行到這兩條語句的時候,程式先被阻塞,等獲得結果之後再發送其它的非同步訊息。
7 loop方法實現react
object LoopReact extends App{
val a1 = Actor.actor {
//注意這裡loop是一個方法,不是關鍵字
//實現型別while迴圈的作用
loop {
react {
//為整型時結束操作
case x: Int=>println("a1 stop: " + x); exit()
case msg: String => println("a1: " + msg)
}
}
}
a1!("我是搖擺少年夢")
a1.!(23)
}