大資料技術之_16_Scala學習_11_客戶資訊管理系統+併發程式設計模型 Akka+Akka 網路程式設計-小黃雞客服案例...
第十五章 客戶資訊管理系統
15.1 專案的開發流程

15.2 專案的需求分析
模擬實現基於文字介面的《客戶資訊管理軟體》。
該軟體 scala 能夠實現對客戶物件的插入、修改、刪除、顯示、查詢(用 ArrayBuffer 或者 ListBuffer 實現),並能夠列印客戶明細表。
15.3 專案的介面
主介面

新增客戶

修改客戶

刪除客戶

客戶列表

15.4 專案的設計-程式框架圖
程式框架圖:設計系統有多少個檔案,以及檔案之間的呼叫關係,可以幫助程式設計師實現模組的設計(清晰),便於程式設計師之間對專案交流分析。【業務優化,設計方案】

15.5 專案的功能實現

15.5.1 完成 Customer 類
根據需求文件或者頁面,寫出 Customer 類
Customer.scala
package com.atguigu.chapter15.customercrm.bean class Customer { // 屬性 var id: Int = _ var name: String = _ var gender: Char = _ var age: Short = _ var tel: String = _ var email: String = _ // 輔助構造器 def this(id: Int, name: String, gender: Char, age: Short, tel: String, email: String) { this this.id = id this.name = name this.gender = gender this.age = age this.tel = tel this.email = email } }
15.5.2 完成顯示主選單和退出軟體功能
CustomerView.scala 功能分析:
1. 將主選單的顯示放入到 while
2. 使用者可以根據輸入,選擇自己的操作
3. 如果輸入5退出
CustomerView.scala
package com.atguigu.chapter15.customercrm.view import scala.io.StdIn class CustomerView { // 定義一個迴圈變數,控制是否退出 var loop = true // 定義一個 key 用於接收使用者輸入的選項 var key = ' ' def mainMenu(): Unit = { do { println("-----------------客戶資訊管理軟體-----------------") println("1 添 加 客 戶") println("2 修 改 客 戶") println("3 刪 除 客 戶") println("4 客 戶 列 表") println("5 退出") println("請選擇(1-5):") key = StdIn.readChar() key match { case '1' => println("添 加 客 戶") case '2' => println("修 改 客 戶") case '3' => println("刪 除 客 戶") case '4' => println("客 戶 列 表") case '5' => this.loop = false } } while (loop) println("你退出了系統...") } }
示例程式碼如下:
package com.atguigu.chapter15.customercrm.app import com.atguigu.chapter15.customercrm.view.CustomerView object CustomerCrm { def main(args: Array[String]): Unit = { new CustomerView().mainMenu() } }
15.5.3 完成顯示客戶列表的功能
CustomerView.scala 功能分析:
1. 接收4,顯示客戶列表
2. 呼叫 CustomerService 的方法 list
3. 需要一個 CustomerService 物件(屬性)
CustomerService.sacla 功能分析:
1. 編寫一個方法 list,返回當前系統有哪些客戶
2. 客戶放在哪?--> 記憶體 --> 可變集合 --> ArrayBuffer
1、在 Customer.sacla 中重寫 toString 方法
override def toString: String = { this.id + "\t\t" + this.name + "\t\t" + this.gender + "\t\t" + this.age + "\t\t" + this.tel + "\t\t" + this.email }
2、在 CustomerService.scala 中編寫一個方法 list,返回當前系統有哪些客戶
class CustomerService { // customers 是存放客戶用的,為了方便測試,我們先進行初始化 val customers = ArrayBuffer(new Customer(1, "tom", '男', 20, "110", "[email protected]")) // 查詢客戶列表的方法 def list(): ArrayBuffer[Customer] = { this.customers } }
3、在 CustomerView.scala 中 呼叫 CustomerService 的方法 list
val customerService = new CustomerService() /* ---------------------------客戶列表--------------------------- 編號姓名性別年齡電話郵箱 1張三男[email protected] 2李四女[email protected] 3王芳女[email protected] -------------------------客戶列表完成------------------------- */ def list(): Unit = { println() println("---------------------------客戶列表---------------------------") println("編號\t\t姓名\t\t性別\t\t年齡\t\t電話\t\t郵箱") // 遍歷 // 呼叫 CustomerService 的方法 list val customers = customerService.list() for (customer <- customers) { // 方式一:輸出 // println(customer.id + "\t\t" + ...) // 方式二:重寫 Customer 的 toString 方法,返回資訊,並且格式化 println(customer) } println("-------------------------客戶列表完成-------------------------") }
15.5.4 完成新增客戶的功能
CustomerView.scala 功能分析:
1. 接收客戶的資訊,並封裝成對應的 Customer 物件
2. 呼叫 CustomerService 的方法 add
CustomerService.sacla 功能分析:
1. 編寫一個方法 add,接收一個 Customer 物件
2. 加入到 ArrayBuffer 中
3. 規定:以新增客戶是第幾個作為它的 id
1、在 Customer.sacla 中新增一個新的 輔助構造器(沒有id屬性)
// 輔助構造器(沒有id屬性) def this(name: String, gender: Char, age: Short, tel: String, email: String) { this this.name = name this.gender = gender this.age = age this.tel = tel this.email = email }
2、在 CustomerService.scala 中編寫一個方法 add,接收一個 Customer 物件,並設定 id 後再加入到 ArrayBuffer 中
// 用於設定使用者 id var customerNum = 1 // 新增客戶的方法 def add(customer: Customer): Boolean = { // 設定 id customerNum += 1 customer.id = customerNum // 加入到 ArrayBuffer 中 customers.append(customer) true }
3、在 CustomerView.scala 中 呼叫 CustomerService 的方法 add
/* ---------------------新增客戶--------------------- 姓名:張三 性別:男 年齡:30 電話:010-56253825 郵箱:[email protected] ---------------------新增完成--------------------- */ def add(): Unit = { println() println("---------------------新增客戶---------------------") println("姓名:") val name = StdIn.readLine() println("性別:") val gender = StdIn.readChar() println("年齡:") val age = StdIn.readShort() println("電話:") val tel = StdIn.readLine() println("郵箱:") val email = StdIn.readLine() // 封裝物件 val customer = new Customer(name, gender, age, tel, email) // 呼叫 CustomerService 的方法 add customerService.add(customer) println("---------------------新增完成---------------------") }
15.5.5 完成刪除客戶的功能
CustomerView.scala 功能分析:
1. 接收客戶 id,準備刪除
2. 呼叫 CustomerService 的 del(id)
CustomerService.sacla 功能分析:
1. 編寫一個方法 del,接收一個 id,先去呼叫另一個方法 findIndexById,判斷
2. 編寫一個方法 findIndexById(因為我們的 ArrayBuffer 索引和 id 並不是對應的)
3. 如果發現有,則刪除,如果沒有就返回 false
1、在 CustomerService.scala 中編寫一個方法 del,接收一個 id,先去呼叫另一個方法 findIndexById,判斷
// 先根據 id 查詢 使用者的 index def findIndexById(id: Int): Int = { // 先假定一個索引,預設 -1,如果找到就改成對應的,如果沒有找到就返回 -1 var index = -1 // 遍歷 ArrayBuffer breakable { for (i <- 0 until customers.length) { if (customers(i).id == id) { index = i break() } } } index } // 再根據 id 刪除使用者 def del(id: Int): Boolean = { val index = findIndexById(id) if (index != -1) { customers.remove(index) true } else { false } }
2、在 CustomerView.scala 中接收客戶 id,呼叫 CustomerService 的 del(id)
/* ---------------------刪除客戶--------------------- 請選擇待刪除客戶編號(-1退出):1 確認是否刪除(Y/N):y ---------------------刪除完成--------------------- */ def del(): Unit = { println() println("---------------------刪除客戶---------------------") println("請選擇待刪除客戶編號(-1退出):") val id = StdIn.readInt() if (id == -1) { println("---------------------刪除沒有完成---------------------") return } println("確認是否刪除(Y/N):") val choice = StdIn.readChar().toLower if (choice == 'y') { if (customerService.del(id)) { println("---------------------刪除完成---------------------") return } } println("---------------------刪除沒有完成---------------------") }
15.5.6 完善退出確認功能
功能說明:
要求使用者在退出時提示 "確認是否退出(Y/N):",使用者必須輸入y/n,否則迴圈提示。且輸入為y時,退出系統;輸入為n時,不退出系統。
1、在 CustomerView.scala 中定義一個方法 isOut,並修改 key 所對應的函式。
// 要求使用者在退出時提示"確認是否退出(Y/N):",使用者必須輸入y/n,否則迴圈提示。且輸入為y時,退出系統;輸入為n時,不退出系統。 def isOut(): Unit = { println() println("確認是否退出(Y/N):") key = StdIn.readChar().toLower key match { case 'y' => this.loop = false case 'n' => this.loop = true case _ => isOut() } }
15.5.7 完善刪除確認功能
功能說明:
要求使用者在刪除確認時提示 "確認是否刪除(Y/N):",使用者必須輸入y/n,否則迴圈提示。
1、在 CustomerView.scala 中,修改 del() 方法即可
/* ---------------------刪除客戶--------------------- 請選擇待刪除客戶編號(-1退出):1 確認是否刪除(Y/N):y ---------------------刪除完成--------------------- */ def del(): Unit = { println() println("---------------------刪除客戶---------------------") println("請選擇待刪除客戶編號(-1退出):") val id = StdIn.readInt() if (id == -1) { println("---------------------刪除沒有完成---------------------") return } println("確認是否刪除(Y/N):") var choice = ' ' // 要求使用者在刪除確認時提示 "確認是否刪除(Y/N):",使用者必須輸入y/n,否則迴圈提示。 breakable { do { choice = StdIn.readChar().toLower if (choice == 'y' || choice == 'n') { break() } println("確認是否刪除(Y/N):") } while (true) } if (choice == 'y') { if (customerService.del(id)) { println("---------------------刪除完成---------------------") return } } println("---------------------刪除沒有完成---------------------") }
15.5.8 完成修改客戶的功能
1、在 CustomerService.scala 中定義一個方法根據 id 修改使用者(更新使用者)的方法 和 // 根據 id 查詢使用者資訊 的方法
// 根據 id 查詢使用者資訊 def findCustomerById(id: Int): Customer = { val index = findIndexById(id) if (index != -1) { customers(index) } else { null } } // 根據 id 修改使用者(更新使用者) def update(id: Int, customer: Customer): Boolean = { val index = findIndexById(id) customers.update(index, customer) true }
2、在 CustomerView.scala 中定義一個方法 update
/* ---------------------修改客戶--------------------- 請選擇待修改客戶編號(-1退出):1 姓名(張三):<直接回車表示不修改> 性別(男): 年齡(30): 電話(010-56253825): 郵箱([email protected]):[email protected] ---------------------修改完成--------------------- */ def update(): Unit = { println() println("---------------------修改客戶---------------------") println("請選擇待修改客戶編號(-1退出):") var id = StdIn.readInt() if (id == -1) { println("---------------------修改沒有完成---------------------") return } val customer = customerService.findCustomerById(id) if (customer == null) { println("---------------------修改沒有完成---------------------") return } var name = customer.name print(s"姓名(${name}):") name = StdIn.readLine() if (name.length == 0) name = customer.name var gender = customer.gender print(s"性別(${gender}):") gender = StdIn.readChar() var age = customer.age print(s"年齡(${age}):") age = StdIn.readShort() var tel = customer.tel print(s"電話(${tel}):") tel = StdIn.readLine() if (tel.length == 0) tel = customer.tel var email = customer.email print(s"郵箱(${email}):") email = StdIn.readLine() if (email.length == 0) email = customer.email // 封裝物件 val newCustomer = new Customer(id, name, gender, age, tel, email) // 呼叫 CustomerService 的方法 update customerService.update(id, newCustomer) println("---------------------修改完成---------------------") }
第十六章 併發程式設計模型 Akka
16.1 Akka 的介紹

16.2 Actor 模型用於解決什麼問題

16.3 Akka 中 Actor 模型詳解
Actor 模型及其說明

對上圖的詳解如下:

16.4 Actor 模型工作機制說明

Actor模型工作機制說明(對照工作機制示意圖理解):

Actor 間傳遞訊息機制(對照工作機制示意圖理解)

16.5 Actor 模型應用例項
16.5.1 Actor 自我通訊
應用例項需求

程式碼實現
SayHelloActor 專案步驟:
1) 建立專案 Mew -> New Project -> 選擇 Maven
2) 給專案命名

3) 下一步 -> Finish
4) 會生成 pom.xml 檔案(maven 檔案, 專案包的依賴)

5) 將下面的 maven 配置模板拷貝到 pom.xml 檔案中,新的 pom.xml 檔案檔案內容如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.atguigu.akka</groupId> <artifactId>SayHelloActor</artifactId> <version>1.0-SNAPSHOT</version> <!-- 定義一下常量 --> <properties> <encoding>UTF-8</encoding> <scala.version>2.11.8</scala.version> <scala.compat.version>2.11</scala.compat.version> <akka.version>2.4.17</akka.version> </properties> <dependencies> <!-- 新增scala的依賴 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- 新增akka的actor依賴 --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_${scala.compat.version}</artifactId> <version>${akka.version}</version> </dependency> <!-- 多程序之間的Actor通訊 --> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_${scala.compat.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies> <!-- 指定外掛--> <build> <!-- 指定原始碼包和測試包的位置 --> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <!-- 指定編譯scala的外掛 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <!-- maven打包的外掛 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <!-- 指定main方法 --> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>xxx</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
6) 因為按照配置模板的內容 "指定原始碼包和測試包的位置" 的部分:
<sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory>
我們需要建立對應的 scala 目錄,並 mark 為 Sources Root
7) 當修改後,第一次速度比較慢,因為 maven 需要 resolve 包的依賴,要下載相關的包。
注意
:需要如圖勾選,update snapshots,而且不需要聯網,如果使用 maven 解決依賴後,仍然 pom.xml 有誤,則只需要重啟下 idea, 或者動一下 pom.xml 檔案(不用改),重新儲存即可。

8) 程式碼實現:
package com.atguigu.akka.actor import akka.actor.{Actor, ActorRef, ActorSystem, Props} // 1. 當我們繼承 Actor 後,就是一個 Actor,需要重寫該 Actor 的核心方法 receive class SayHelloActor extends Actor { // 迴圈的接收訊息 // 1. receive方法,會被該 Actor 的 MailBox(實現了 Runnable 介面)呼叫 // 2. 當該 Actor 的 MailBox 接收到訊息,就會呼叫 receive 方法 // 3. Receive 的底層:type Receive = PartialFunction[Any, Unit] override def receive: Receive = { // 接受訊息並處理,如果接收到 exit,就退出 case "hello" => println("傳送:hello\t\t迴應:hello too:)") case "ok" => println("傳送:ok\t\t\t迴應:ok too:)") case "exit" => { println("接收到exit~指令,退出系統...") context.stop(self) // 停止自己的 ActorRef context.system.terminate() // 關閉 ActorSystem } } } object SayHelloActor { // 1. 先建立一個 ActorSystem,專門用於建立 Actor private val actoryFactory = ActorSystem("actoryFactory") // 2. 建立一個 Actor 的同時,返回 Actor 的 ActorRef private val sayHelloActorRef: ActorRef = actoryFactory.actorOf(Props[SayHelloActor], "sayHelloActor") // (1) Props[SayHelloActor] 建立了一個 SayHelloActor 例項,這裡使用到了反射 // (2) "sayHelloActor" 是 Actor 的名字 // (3) sayHelloActorRef: ActorRef=>是 Props[SayHelloActor] 的引用 // (4) 建立的 SayHelloActor 例項被 ActorSystme 接管 def main(args: Array[String]): Unit = { // 給 SayHelloActor 發訊息(郵箱) sayHelloActorRef ! "hello" sayHelloActorRef ! "ok" sayHelloActorRef ! "ok~" // 研究非同步如何退出 ActorSystem sayHelloActorRef ! "exit" } }
輸出結果如下:
傳送:hello迴應:hello too:) 傳送:ok迴應:ok too:) 接收到exit~指令,退出系統...
9) 執行的效果

程式碼的示意圖和小結

小結:
當程式執行 private val sayHelloActorRef: ActorRef = actoryFactory.actorOf(Props[SayHelloActor], "sayHelloActor") 會完成如下任務: [這是非常重要的方法]
- 1、actorFactory 是 ActorSystem("actorFactory") 建立的。
- 2、這裡的 Props[SayHelloActor] 會使用反射機制,建立一個 SayHelloActor 物件,如果是 actorFactory.actorOf(Props(new SayHelloActor(其他代理物件的引用)), "sayHelloActor") 形式,就是使用 new 的方式建立一個 SayHelloActor 物件。注意:Props() 是小括號。
- 3、會建立一個 SayHelloActor 物件的代理物件 sayHelloActorRef,使用 sayHelloActorRef 才能傳送訊息。
- 4、會在底層建立 Dispather Message,是一個執行緒池,用於分發訊息,訊息是傳送到對應的 Actor 的 MailBox。
- 5、會在底層建立 SayHelloActor 的 MailBox 物件,該物件是一個佇列,可接收 Dispatcher Message 傳送的訊息。
- 6、MailBox 實現了 Runnable 介面,是一個執行緒,一直執行並呼叫 Actor 的 receive 方法,因此當Dispather 傳送訊息到 MailBox 時,Actor 在r eceive 方法就可以得到資訊。
- 7、SayHelloActorRef ! "hello" ,表示把 hello 訊息傳送到 SayHello Actor 的 Mailbox (通過Dispatcher Message 轉發)。
16.5.2 Actor 之間通訊
應用例項需求

兩個 Actor 的通訊機制原理圖

程式碼實現
AActor.scala
package com.atguigu.akka.actors import akka.actor.{Actor, ActorRef} class AActor(bActorRef: ActorRef) extends Actor { var count = 0 override def receive: Receive = { case "start" => { println("AActor 出招了,start ok") bActorRef ! "我打" } case "我打" => { count += 1 // 給 BActor 發出訊息 // 這裡需要持有 BActor 的引用(BActorRef)才可以 println(s"AActor(黃飛鴻) 厲害!看我佛山無影腳 第${count}腳") Thread.sleep(1000) bActorRef ! "我打" // 給 BActor 發出訊息 } } }
BActor.scala
package com.atguigu.akka.actors import akka.actor.Actor class BActor extends Actor { var count = 0 override def receive: Receive = { case "我打" => { count += 1 println(s"BActor(喬峰) 挺猛 看我降龍十八掌 第${count}掌") Thread.sleep(1000) // 通過 sender() 方法,可以獲取到傳送訊息的 Actor 的 ActorRef sender() ! "我打" } } }
ActorApp.scala
package com.atguigu.akka.actors import akka.actor.{ActorRef, ActorSystem, Props} // 100招後,就退出 object ActorApp extends App { // 建立 ActorSystem val actorfactory = ActorSystem("actorfactory") // 先建立 BActor 的引用/代理 val bActorRef: ActorRef = actorfactory.actorOf(Props[BActor], "bActor") // 建立 AActor 的引用時需要持有 BActor 的引用 val aActorRef: ActorRef = actorfactory.actorOf(Props(new AActor(bActorRef)), "aActor") // aActor 先出招 aActorRef ! "start" }
輸出結果如下:
AActor 出招了,start ok BActor(喬峰) 挺猛 看我降龍十八掌 第1掌 AActor(黃飛鴻) 厲害!看我佛山無影腳 第1腳 BActor(喬峰) 挺猛 看我降龍十八掌 第2掌 AActor(黃飛鴻) 厲害!看我佛山無影腳 第2腳 BActor(喬峰) 挺猛 看我降龍十八掌 第3掌 AActor(黃飛鴻) 厲害!看我佛山無影腳 第3腳 BActor(喬峰) 挺猛 看我降龍十八掌 第4掌 AActor(黃飛鴻) 厲害!看我佛山無影腳 第4腳 BActor(喬峰) 挺猛 看我降龍十八掌 第5掌 AActor(黃飛鴻) 厲害!看我佛山無影腳 第5腳 BActor(喬峰) 挺猛 看我降龍十八掌 第6掌 AActor(黃飛鴻) 厲害!看我佛山無影腳 第6腳 ......
程式碼的小結
- 1、兩個 Actor 通訊機制和 Actor 自身發訊息機制基本一樣,只是要注意如下:
- 2、如果 A Actor 在需要給 B Actor 發訊息,則需要持有 B Actor 的 ActorRef,可以通過建立 A Actor 時,傳入 B Actor 的代理物件(ActorRef)。
- 3、當 B Actor 在 receive 方法中接收到訊息,需要回復時,可以通過 sender() 獲取到傳送 Actor 的代理物件。
如何理解 Actor 的 receive 方法被呼叫?
- 1、每個 Actor 對應 MailBox。
- 2、MailBox 實現了 Runnable 介面,處於執行的狀態。
- 3、當有訊息到達 MailBox,就會去呼叫 Actor 的 receive 方法,即將訊息推送給 receive 方法。
16.7 Akka 網路程式設計
看兩個實際應用(socket/tcp/ip)
QQ、迅雷、百度網盤客戶端、新浪網站、京東商城、淘寶
16.7.1 Akka 網路程式設計基本介紹

16.7.2 協議(tcp/ip)
TCP/IP(Transmission Control Protocol/Internet Protocol)的簡寫,中文譯名為傳輸控制協議/因特網互聯協議,又叫網路通訊協議,這個協議是Internet 最基本的協議、是 Internet 國際網際網路絡的基礎,簡單地說,就是由網路層的IP協議和傳輸層的TCP協議組成的。
TCP/IP 3本聖經級別書籍:xxx

16.7.3 OSI 與 Tcp/ip 參考模型

16.7.4 ip 地址
概述:每個 internet 上的主機和路由器都有一個 ip 地址,它包括網路號和主機號,ip 地址有 ipv4(32位) 或者 ipv6(128位),可以通過 ipconfig(ifconfig) 來檢視。
一個小技巧:網路不通時,如何確定是哪一個路由(ip地址)出現問題?答:使用 tracert 指令。演示如下:

16.7.5 埠(port)
我們這裡所指的埠不是指物理意義上的埠,而是特指TCP/IP協議中的埠,是邏輯意義上的埠。如果把 IP 地址比作一間房子,埠就是出入這間房子的門。真正的房子只有幾個門,但是一個 IP 地址的埠 可以有65535(即:256×256-1)個之多!埠是通過埠號來標記的。
埠(port)-分類

埠(port)-使用注意

socket 程式設計中客戶端和伺服器的網路分佈

16.8 Akka 網路程式設計-小黃雞客服案例
16.8.1 需求分析 + 介面設計
需求分析
1、服務端進行監聽(9999)
2、客戶端可以通過鍵盤輸入,傳送諮詢問題給小黃雞客服(服務端)
3、小黃雞(服務端)回答客戶的問題
介面設計
服務端:

客戶端:

16.8.2 程式框架圖

16.8.3 功能實現
程式碼結構:

示例程式碼如下:
YellowChickenServer.scala
package com.atguigu.akka.yellowchicken.server import akka.actor.{Actor, ActorRef, ActorSystem, Props} import com.atguigu.akka.yellowchicken.common.{ClientMessage, ServerMessage} import com.typesafe.config.ConfigFactory class YellowChickenServer extends Actor { override def receive: Receive = { case "start" => println("start 小黃雞客服開始工作了...") // 如果接收到了服務端的發來的訊息,即 ClientMessage case ClientMessage(mes) => { println("客戶諮詢的問題是:" + mes) mes match { // 使用 match case 匹配(模糊匹配) case "大資料學費" => sender() ! ServerMessage("20000 RMB") case "學校地址" => sender() ! ServerMessage("北京市朝陽區青年路大悅城") case "學習什麼技術" => sender() ! ServerMessage("大資料 前端 Python") case _ => sender() ! ServerMessage("你說的啥子:)") } } } } // 主程式入口 object YellowChickenServerApp extends App { val host = "127.0.0.1" // 服務端ip地址 val port = 9999 // 埠 // 建立 config 物件,指定協議型別、監聽的ip和埠 val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin) // 建立 ActorSystem val serverActorSystem = ActorSystem("Server", config) // 建立 YellowChickenServer 的 Actor 和 ActorRef val yellowChickenServerActorRef: ActorRef = serverActorSystem.actorOf(Props[YellowChickenServer], "YellowChickenServer-01") // 啟動服務端 yellowChickenServerActorRef ! "start" }
CustomerActor.scala
package com.atguigu.akka.yellowchicken.client import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import com.atguigu.akka.yellowchicken.common.{ClientMessage, ServerMessage} import com.typesafe.config.ConfigFactory import scala.io.StdIn class CustomerActor(serverHost: String, serverPort: Int) extends Actor { // 定義一個 YellowChickenServerRef var serverActorRef: ActorSelection = _ // 在 Actor 中有一個方法 preStart 方法,它會在 Actor 執行前執行 // 在 Akka 開發中,通常將初始化的工作,放在 preStart 方法中 override def preStart(): Unit = { this.serverActorRef = context.actorSelection(s"akka.tcp://Server@${serverHost}:${serverPort}/user/YellowChickenServer-01") println("this.serverActorRefer=" + this.serverActorRef) } override def receive: Receive = { case "start" => println("start 客戶端執行,可以諮詢問題") case mes: String => { // 發給服務端 // serverActorRef ! mes // 不應該傳送字串,應該包裝一把,應該傳送一個(樣例)物件(即協議) serverActorRef ! ClientMessage(mes) // 此時傳送的是一個物件,該樣例類預設實現了序列化 和 apply 方法 } // 如果接受到了伺服器端的訊息 case ServerMessage(mes) => { println(s"收到小黃雞客服(Server)訊息:$mes") } } } // 主程式入口 object CustomerActorApp extends App { val (host, port, serverHost, serverPort) = ("127.0.0.1", 9990, "127.0.0.1", 9999) val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin) // 建立 ActorSystem val clientActorSystem = ActorSystem("Client", config) // 建立 CustomerActor 的 Actor 和 ActorRef val clientActorRef: ActorRef = clientActorSystem.actorOf(Props(new CustomerActor(serverHost, serverPort)), "CustomerActor-01") // 啟動客戶端 clientActorRef ! "start" // 客戶端傳送訊息 while (true) { val mes = StdIn.readLine() clientActorRef ! mes } }
MessageProtocol.scala
package com.atguigu.akka.yellowchicken.common // 使用樣例類來構建協議 // 1、客戶端傳送服務端的協議(序列化物件) case class ClientMessage(mes: String)// 回顧:樣例類的構造器中的每一個引數都預設為 val ,即只可讀。 // 2、伺服器端傳送給客戶端的協議 case class ServerMessage(mes: String)
16.9 Akka 網路程式設計-Spark Master Worker 程序通訊專案
16.9.1 專案意義
1、深入理解 Spark 的 Master 和 Worker 的通訊機制。
2、為了方便同學們看 Spark 的底層原始碼,命名的方式和原始碼保持一致(如:通訊訊息類命名就是一樣的)。
3、加深對 主從服務心跳檢測機制(HeartBeat)
的理解,方便以後 spark 原始碼二次開發。
16.9.2 專案需求分析

16.9.3 專案介面設計
我們主要是通過應用例項,來剖析 Spark 的 Master 和 Worker 的通訊機制,因此功能比較簡潔,設計的介面如下。看後面演示即可。
16.9.4 實現功能 1-Worker 完成註冊
功能要求: Worker 註冊到 Master,Master 完成註冊,並回復 Worker 註冊成功。

程式碼結構:

示例程式碼如下:
MasterActor.scala
package com.atguigu.akka.sparkmasterworker.master import akka.actor.{Actor, ActorSystem, Props} import com.atguigu.akka.sparkmasterworker.common.{RegisterWorkerInfo, RegisteredWorkerInfo, WorkerInfo} import com.typesafe.config.ConfigFactory import scala.collection.mutable class MasterActor extends Actor { // 定義一個 mutable.HashMap 屬性,用於管理 Worker val workers = mutable.HashMap[String, WorkerInfo]() override def receive: Receive = { case "start" => println("Master伺服器啟動了...") // 接收到 Worker 客戶端註冊的資訊,儲存進 HashMap case RegisterWorkerInfo(id, cpu, ram) => { if (!workers.contains(id)) { // 建立 WorkerInfo val workerInfo = new WorkerInfo(id, cpu, ram) // 加入到 HashMap workers += (id -> workerInfo) println("伺服器的Workers= " + workers) // 回覆客戶端註冊成功 sender() ! RegisteredWorkerInfo } } } } object MasterActorApp { def main(args: Array[String]): Unit = { val host = "127.0.0.1" // 服務端ip地址 val port = 10005 // 埠 // 建立 config 物件,指定協議型別、監聽的ip和埠 val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin) // 先建立 ActorSystem val masterActorSystem = ActorSystem("Master", config) // 再建立 Master 的 Actor 和 ActorRef val masterActorRef = masterActorSystem.actorOf(Props[MasterActor], "MasterActor-01") // 啟動 Master masterActorRef ! "start" } }
WorkerActor.scala
package com.atguigu.akka.sparkmasterworker.worker import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import com.atguigu.akka.sparkmasterworker.common.{RegisterWorkerInfo, RegisteredWorkerInfo} import com.typesafe.config.ConfigFactory class WorkerActor(serverHost: String, serverPort: Int) extends Actor { // 定義一個 MasterActorRef var masterActorProxy: ActorSelection = _ // 定義 Worker 的編號 var id = java.util.UUID.randomUUID().toString // 在 Actor 中有一個方法 preStart 方法,它會在 Actor 執行前執行 // 在 Akka 開發中,通常將初始化的工作,放在 preStart 方法中 override def preStart(): Unit = { this.masterActorProxy = context.actorSelection(s"akka.tcp://Master@${serverHost}:${serverPort}/user/MasterActor-01") println("this.masterActorProxy=" + this.masterActorProxy) } override def receive = { case "start" => { println("Worker客戶端啟動執行") // 給伺服器傳送一個註冊資訊 masterActorProxy ! RegisterWorkerInfo(id, 16, 16 * 1024) } case RegisteredWorkerInfo => { println("WorkedId= " + id + " 註冊成功!") } } } object WorkerActorApp { def main(args: Array[String]): Unit = { val (host, port, serverHost, serverPort) = ("127.0.0.1", 10001, "127.0.0.1", 10005) val config = ConfigFactory.parseString( s""" |akka.actor.provider="akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname=$host |akka.remote.netty.tcp.port=$port """.stripMargin) // 建立 ActorSystem val workerActorSystem = ActorSystem("Worker", config) // 建立 WorkerActor 的 Actor 和 ActorRef val workerActorRef: ActorRef = workerActorSystem.actorOf(Props(new WorkerActor(serverHost, serverPort)), "WorkerActor-01") // 啟動客戶端 workerActorRef ! "start" } }
MessageProtocol.scala
package com.atguigu.akka.sparkmasterworker.common // 使用樣例類來構建協議 // Worker 註冊資訊 case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int) // 這個是 WorkerInfo,是儲存在 Master 的 HashMap 中的,該 HashMap 用於管理 Worker // 將來這個 WorkerInfo 會擴充套件,比如 增加 Worker 上一次的心跳時間 class WorkerInfo(val id: String, val cpu: Int, val ram: Int) // 當 Worker 註冊成功,伺服器返回一個 RegisteredWorkerInfo 物件 case object RegisteredWorkerInfo
16.9.5 實現功能 2-Worker 定時傳送心跳
功能要求:Worker 定時傳送心跳給 Master,Master 能夠接收到,並更新 Worker 上一次心跳時間。

示例程式碼如下:
MessageProtocol.scala 中增加程式碼
package com.atguigu.akka.sparkmasterworker.common // 使用樣例類來構建協議 // Worker 註冊資訊 case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int) // 這個是 WorkerInfo,是儲存在 Master 的 HashMap 中的,該 HashMap 用於管理 Worker // 將來這個 WorkerInfo 會擴充套件,比如 增加 Worker 上一次的心跳時間 class WorkerInfo(val id: String, val cpu: Int, val ram: Int) { // 新增屬性:心跳時間 var lastHeartBeatTime: Long = _ } // 當 Worker 註冊成功,伺服器返回一個 RegisteredWorkerInfo 物件 case object RegisteredWorkerInfo // 每隔一定時間定時器傳送給 Master 一個心跳 case class HeartBeat(id: String) // Worker 每隔一定時間定時器傳送給 自己 一個訊息 case object SendHeartBeat
MasterActor.scala 中增加程式碼
case HeartBeat(id) => { // 更新對應的 Worker 的心跳時間 // 1、先從 Worker 中取出 WorkerInfo val workerInfo = workers(id) workerInfo.lastHeartBeatTime = System.currentTimeMillis() println("Master更新了 " + id + " 的心跳時間 ") }
WorkerActor.scala 中增加程式碼
// 當客戶端註冊成功後,就定義一個定時器,每隔一定時間,傳送 SendHeartBeat 給自己 import context.dispatcher context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat) case SendHeartBeat => { println("WorkedId= " + id + " 給Master傳送心跳") masterActorProxy ! HeartBeat(id) }
16.9.6 實現功能 3-Master 啟動定時任務,定時檢測註冊的 Worker
功能要求:Master 啟動定時任務,定時檢測註冊的 Worker 有哪些沒有更新心跳,已經超時的 Worker,將其從 HashMap 中刪除掉。

示例程式碼如下:
MessageProtocol.scala 中增加程式碼
// Master 給自己傳送一個觸發檢查超時 Worker 的資訊 case object StartTimeOutWorker // Master 給自己發訊息,檢測 Worker,對於心跳超時的 case object RemoveTimeOutWorker
MasterActor.scala 中增加程式碼
case "start" => { println("Master伺服器啟動了...") // Master 啟動定時任務,定時檢測註冊的 Worker 有哪些沒有更新心跳,已經超時的 Worker,將其從 HashMap 中刪除掉。 self ! StartTimeOutWorker } // 開啟定時器,每隔一定時間檢測是否有 Worker 的心跳超時 case StartTimeOutWorker => { println("開啟了定時檢測Worker心跳的任務") import context.dispatcher // 使用排程器時候必須匯入dispatcher context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker) } // 判斷哪些 Worker 心跳超時(nowTime - lastHeartBeatTime),對已經超時的 Worker,將其從 HashMap 中刪除掉。 case RemoveTimeOutWorker => { // 首先獲取所有 Workers 的所有 WorkerInfo val workerInfos = workers.values val nowTime = System.currentTimeMillis() // 過濾出所有超時的 workerInfo 並刪除即可 workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeatTime) > 6000) .foreach(workerInfo => workers.remove(workerInfo.id)) println("當前有 " + workers.size + " 個Worker存活") }
16.9.7 實現功能 4-Master,Worker 的啟動引數執行時指定
功能要求:Master,Worker 的啟動引數執行時指定,而不是固定寫在程式中的。

MasterActor.scala 中修改程式碼
if (args.length != 3) { println("請輸入引數 host port MasterActor的名字") sys.exit() } val host = args(0)// 服務端ip地址 val port = args(1)// 埠 val masterName = args(2)// MasterActor的名字 ...... // 再建立 Master 的 Actor 和 ActorRef val masterActorRef = masterActorSystem.actorOf(Props[MasterActor], s"${masterName}")
WorkerActor.scala 中增修改程式碼
if (args != 6) { println("請輸入引數 host port WorkerActor的名字 serverHost serverPort MasterActor的名字") } val host = args(0) val port = args(1) val workerName = args(2) val serverHost = args(3) val serverPort = args(4) val masterName = args(5) ...... // 建立 WorkerActor 的 Actor 和 ActorRef val workerActorRef: ActorRef = workerActorSystem.actorOf(Props(new WorkerActor(serverHost, serverPort.toInt, masterName)), s"${workerName}")
Master 配置引數截圖:

Worker 配置引數截圖:
