大資料之Spark(四)--- Dependency依賴,啟動模式,shuffle,RDD持久化,變數傳遞,共享變數,分散式計算PI的值
阿新 • • 發佈:2018-11-09
一、Dependency:依賴:RDD分割槽之間的依存關係 --------------------------------------------------------- 1.NarrowDependency: 子RDD的每個分割槽依賴於父RDD的少量分割槽。 | / \ --- |---- OneToOneDependency //父子RDD之間的分割槽存在一對一關係。 |---- RangeDependency //父RDD的一個分割槽範圍和子RDD存在一對一關係。 |---- PruneDependency //刪減依賴--在PartitionPruningRDD和其父RDD之間的依賴,子RDD包含了父RDD的分割槽子集。。 2.ShuffleDependency //混洗依賴,在shuffle階段輸出時的一種依賴。 二、SparkContext建立排程器的過程 ------------------------------------------------------------------ [SparkContext.scala:501行] val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) 三、Spark啟動模式 -------------------------------------------------- 1.[本地模式,通過執行緒模擬] 本地後臺排程器 spark local spark local[3] //3執行緒,模擬cluster叢集 spark local[*] //匹配cpu個數, spark local[3,2] //3:3個執行緒,2表示每個分割槽重試1次[0和1等價,不重試] conf.seetMaster[3] conf.seetMaster[*] conf.seetMaster[3,2] 2.[相當於偽分散式] StandaloneSchedulerBackend spark local-cluster[N, cores, memory] //模擬spark叢集。 3.[完全分散式] StandaloneSchedulerBackend spark spark://s201:7077 //連線到spark叢集上. 4.程式碼演示
import org.apache.spark.{SparkConf, SparkContext} object Test8 { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf() sparkConf.setAppName("Test-1") sparkConf.setMaster("local[2]") sparkConf.setMaster("local[*]") sparkConf.setMaster("local[2,3]") sparkConf.setMaster("local[*]") val sc: SparkContext = new SparkContext(sparkConf) val data = List(1,2,3,4,5,6,7,8,9,10); val rdd1 = sc.parallelize(data) val rdd2 = rdd1.map( e => { val s = Thread.currentThread().getName(); println(s); e } ) val rdd3 = rdd1.repartition(4); val rdd4 = rdd3.map( e => { val s = Thread.currentThread().getName(); println(s); e } ) rdd4.collect() } }
四、shuffle操作 -------------------------------------------------- 1.跨分割槽再分發資料的一種機制 2.觸發shuffle的操作:groupByKey reduceByKey... reparatition coalesce... join cogroup 3.shuffle對效能的影響 a.shuffle涉及到磁碟或者網路io,所以成本很高 b.shuffle為了資料重用,在磁碟上生成大量的中間檔案 c.垃圾回收如果不及時,長時間的shuffle會佔用大量的磁碟空間 五、RDD持久化 -------------------------------------------------- 1.資料在記憶體中持久化,可以跨操作 2.持久化RDD時,節點上的每個分割槽的資料都會儲存在記憶體中,以備在其他操作中進行重用 3.這種快取技術是迭代式計算和互動式查詢的重要手段。 4.可以使用persist()和cache(),來進行rdd的持久化,cache()是persist()一種,使用記憶體方式進行快取 a.首次Action計算時,會發生persist(),會將計算的資料儲存在節點記憶體中 b.spark的這種快取機制,是容錯的,如果rdd的任何一個分割槽丟失了,都可以通過最初建立rdd的進行重新計算出直到Action的RDD結果 c.每個RDD都可以使用不同的儲存級別進行持久化,可以通過storelevel進行設定 MEMORY_ONLY //只在記憶體 MEMORY_AND_DISK MEMORY_ONLY_SER //記憶體儲存(序列化) MEMORY_AND_DISK_SER DISK_ONLY //硬碟 MEMORY_ONLY_2 //帶有副本 MEMORY_AND_DISK_2 //快速容錯。 OFF_HEAP d.級別選取建議 如果想快速容錯,就帶副本:MEMORY_ONLY_2 如果記憶體足夠,選取MEMORY_ONLY 如果記憶體不是很足夠,選取MEMORY_ONLY_SER 儘量不要快取到磁碟,除非記憶體非常不夠 5.刪除持久化資料 rdd2.unpersist(isBlocking = true) 6.程式碼演示
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object TestPersist {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf()
sparkConf.setAppName("Test-1")
sparkConf.setMaster("local")
val sc: SparkContext = new SparkContext(sparkConf)
val rdd1 = sc.parallelize(1 to 20)
val rdd2 = rdd1.map(
e => {
println(e)
e * 2
}
)
//更改快取地
rdd2.persist(StorageLevel.DISK_ONLY)
//rdd2.persist(StorageLevel.MEMORY_ONLY)
//將rdd2的結果快取到記憶體中
//rdd2.cache()
println(rdd2.reduce(_ + _))
//移除快取
rdd2.unpersist(true)
println(rdd2.reduce(_ + _))
}
}
六、變數傳遞過程中的序列化 ---------------------------------------------------------- 因為是叢集計算,所以map過程的程式碼和過程中的變數,會將變數的值拷貝,然後通過網路,分發到各個節點,[區別於共享變數:每次計算都要分發的]過程中需要進行序列和反序列,所以就要求過程中的變數要能序列化 scala> class Dog scala> val d = new Dog scala> sc.parallelize(1 to 20).map( e=> {println(d);e}).reduce(_ + _) //會報錯因為dog不能序列化 //需要定義Dog的時候繼承Serializable介面 scala> case class Dog (name:String,age:Int) scala> val d = Dog("tom",1) scala> sc.parallelize(1 to 20).map( e=> {println(d);e}).reduce(_ + _) 七、共享變數 --------------------------------------------------- 1.在所有的節點間共享一個變數。 2.Spark通過廣播變數和累加器實現變數的共享 3.廣播變數 a.一個只讀的變數,在建立的時候廣播一次[也必須繼承了序列化介面],然後在每個節點進行快取。而不是跟隨任務進行網路間傳遞 b.使用方式 scala> val broadcastVar = sc.broadcast(Array(1,2,3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(3) scala> broadcastVar.value res7: Array[Int] = Array(1, 2, 3) 4.累加器 a.一個只能增加的變數,高效並行。通常用於MR的Counter和Sum。UI可見 b.使用方式 scala> val ac1 = sc.longAccumulator("ac1") ac1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 288, name: Some(ac1), value: 0) scala> ac1.value res8: Long = 0 scala> sc.parallelize(1 to 20).map(_ * 2).map(e=>{ac1.add(1) ; e}).reduce(_+_) res9: Int = 420 scala> ac1.value res10: Long = 20 c.自定義累加器 class MyAccumulatorV2 extends AccumulatorV2[V,T]{ private val v:V = ... def reset(): def add(): def sum(): def count(): def value(): ... } 八、Spark分散式計算PI的值 ---------------------------------------------------------- scala> sc.parallelize(1 to 999999999).map(e=>{ val a = 1f / (2 * e - 1) ; val b = if (e % 2 == 0) -1 else 1 ; a * b * 4 } ).reduce(_+_) res19: Float = 3.1415954