1. 程式人生 > >spark的持久化和共享變數

spark的持久化和共享變數

1. 持久化運算元cache

  介紹:正常情況下,一個RDD是不包含真實資料的,只包含描述這個RDD元資料資訊,如果對這個RDD呼叫cache方法,那麼這個RDD的資料,依然沒有真實資料,直到第一次呼叫一個action的運算元觸發了這個RDD的資料生成,那麼cache操作就會把資料儲存在記憶體中,所以第二次重複利用這個RDD的時候,計算速度將會快很多。
spark的持久化和共享變數
其中最主要的儲存級別為:

//不儲存在記憶體也不在磁碟
val NONE = new StorageLevel(false, false, false, false)
//儲存在磁碟
val DISK_ONLY = new StorageLevel(true, false, false, false)
//儲存在磁碟,儲存2份
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
//儲存在記憶體
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
//儲存在記憶體 儲存2份
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
//儲存在記憶體並序列化
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
//記憶體磁碟結合使用
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
//儲存在堆外記憶體
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

相應的操作:

        //設定持久化
        listRDD.cache()
        //移除持久化
        listRDD.unpersist()

2. 共享變數

  介紹:在 Spark 程式中,當一個傳遞給 Spark 操作(例如 map 和 reduce)的函式在遠端節點上面執行 時,Spark 操作實際上操作的是這個函式所用變數的一個獨立副本。這些變數會被複制到每臺機器上,並且這些變數在遠端機器上的所有更新都不會傳遞迴驅動程式。通常跨任務的讀 寫變數是低效的,但是,Spark 還是為兩種常見的使用模式提供了兩種有限的共享變數:廣播變數(Broadcast Variable)和累加器

(Accumulator)。

 (1)廣播變數

在不使用廣播變數的時候:

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        //這一句程式碼是在 driver中執行的,相當於這個變數是在driver程序中的。
        val a=3

        /**
          * kv._2+a這句程式碼是在executor中執行的,
          * 其中a這個變數會在和f序列化的過程中,會攜帶過去。
          * 並且每一個task都會複製一份,可想而知如果這個a變數是一個大物件,那就是一個災難
          */
        listRDD.map(kv=>(kv._1,kv._2+a))
    }
}

spark的持久化和共享變數
使用廣播變數的時候:

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        //這一句程式碼是在 driver中執行的,相當於這個變數是在driver程序中的。
        val a=3
        //設定廣播變數,每一個executor中的task共享一個廣播變數
        val broadcast: Broadcast[Int] = sc.broadcast(a)
        listRDD.map(kv=>{
            //獲取廣播變數
            val aa=broadcast.value
            (kv._1,kv._2+aa)
        })
    }
}

spark的持久化和共享變數
總結:如果 executor 端用到了 Driver 的變數,如果不使用廣播變數在 Executor 有多少 task 就有 多少 Driver 端的變數副本。如果 Executor 端用到了 Driver 的變數,如果使用廣播變數在每個 Executor 中都只有一份 Driver 端的變數副本。
使用的廣播變數的條件
   - 廣播變數只能在driver端定義,不能在executor中定義
   - 在 Driver 端可以修改廣播變數的值,在 Executor 端無法修改廣播變數的值。
   - 廣播變數的值越大,使用廣播變數的優勢越明顯
   - task個數越多,使用廣播變數的優勢越明顯

 (2)累加器

   介紹:在 Spark 應用程式中,我們經常會有這樣的需求,如異常監控,除錯,記錄符合某特性的資料的數目,這種需求都需要用到計數器,如果一個變數不被宣告為一個累加器,那麼它將在被改變時不會在 driver 端進行全域性彙總,即在分散式執行時每個 task 執行的只是原始變數的一個副本,並不能改變原始變數的值,但是當這個變數被宣告為累加器後,該變數就會有分散式計數的功能。
案例

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        //統計檔案有多少行
        val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt")
        //設定累加器
        val mysum: LongAccumulator = sc.longAccumulator("Mysum")
        hdfsRDD.map(line=>{
            mysum.add(1)
            line
        }).collect() //觸發提交操作
        //獲取累加器的值
        println(mysum.value)
        //重置累加器
        mysum.reset()
    }
}

使用累加器的注意事項
   - 累加器在 Driver 端定義賦初始值,累加器只能在 Driver 端讀取最後的值,在 Excutor 端更新。
   - 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
   - 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
   - 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
   - 如果系統自帶的累加器不能滿足要求,還可以自定義累加器