spark的持久化和共享變數
1. 持久化運算元cache
介紹:正常情況下,一個RDD是不包含真實資料的,只包含描述這個RDD元資料資訊,如果對這個RDD呼叫cache方法,那麼這個RDD的資料,依然沒有真實資料,直到第一次呼叫一個action的運算元觸發了這個RDD的資料生成,那麼cache操作就會把資料儲存在記憶體中,所以第二次重複利用這個RDD的時候,計算速度將會快很多。
其中最主要的儲存級別為:
//不儲存在記憶體也不在磁碟 val NONE = new StorageLevel(false, false, false, false) //儲存在磁碟 val DISK_ONLY = new StorageLevel(true, false, false, false) //儲存在磁碟,儲存2份 val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) //儲存在記憶體 val MEMORY_ONLY = new StorageLevel(false, true, false, true) //儲存在記憶體 儲存2份 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) //儲存在記憶體並序列化 val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) //記憶體磁碟結合使用 val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) //儲存在堆外記憶體 val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
相應的操作:
//設定持久化
listRDD.cache()
//移除持久化
listRDD.unpersist()
2. 共享變數
介紹:在 Spark 程式中,當一個傳遞給 Spark 操作(例如 map 和 reduce)的函式在遠端節點上面執行 時,Spark 操作實際上操作的是這個函式所用變數的一個獨立副本。這些變數會被複制到每臺機器上,並且這些變數在遠端機器上的所有更新都不會傳遞迴驅動程式。通常跨任務的讀 寫變數是低效的,但是,Spark 還是為兩種常見的使用模式提供了兩種有限的共享變數:廣播變數(Broadcast Variable)和累加器
(1)廣播變數
在不使用廣播變數的時候:
object SparktTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("SparktTest") conf.setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18)) val listRDD: RDD[(String, Int)] = sc.parallelize(list) //這一句程式碼是在 driver中執行的,相當於這個變數是在driver程序中的。 val a=3 /** * kv._2+a這句程式碼是在executor中執行的, * 其中a這個變數會在和f序列化的過程中,會攜帶過去。 * 並且每一個task都會複製一份,可想而知如果這個a變數是一個大物件,那就是一個災難 */ listRDD.map(kv=>(kv._1,kv._2+a)) } }
使用廣播變數的時候:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
val listRDD: RDD[(String, Int)] = sc.parallelize(list)
//這一句程式碼是在 driver中執行的,相當於這個變數是在driver程序中的。
val a=3
//設定廣播變數,每一個executor中的task共享一個廣播變數
val broadcast: Broadcast[Int] = sc.broadcast(a)
listRDD.map(kv=>{
//獲取廣播變數
val aa=broadcast.value
(kv._1,kv._2+aa)
})
}
}
總結:如果 executor 端用到了 Driver 的變數,如果不使用廣播變數在 Executor 有多少 task 就有 多少 Driver 端的變數副本。如果 Executor 端用到了 Driver 的變數,如果使用廣播變數在每個 Executor 中都只有一份 Driver 端的變數副本。
使用的廣播變數的條件:
- 廣播變數只能在driver端定義,不能在executor中定義
- 在 Driver 端可以修改廣播變數的值,在 Executor 端無法修改廣播變數的值。
- 廣播變數的值越大,使用廣播變數的優勢越明顯
- task個數越多,使用廣播變數的優勢越明顯
(2)累加器
介紹:在 Spark 應用程式中,我們經常會有這樣的需求,如異常監控,除錯,記錄符合某特性的資料的數目,這種需求都需要用到計數器,如果一個變數不被宣告為一個累加器,那麼它將在被改變時不會在 driver 端進行全域性彙總,即在分散式執行時每個 task 執行的只是原始變數的一個副本,並不能改變原始變數的值,但是當這個變數被宣告為累加器後,該變數就會有分散式計數的功能。
案例:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
//統計檔案有多少行
val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt")
//設定累加器
val mysum: LongAccumulator = sc.longAccumulator("Mysum")
hdfsRDD.map(line=>{
mysum.add(1)
line
}).collect() //觸發提交操作
//獲取累加器的值
println(mysum.value)
//重置累加器
mysum.reset()
}
}
使用累加器的注意事項
- 累加器在 Driver 端定義賦初始值,累加器只能在 Driver 端讀取最後的值,在 Excutor 端更新。
- 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
- 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
- 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
- 如果系統自帶的累加器不能滿足要求,還可以自定義累加器