1. 程式人生 > >第14課:spark RDD彈性表現和來源,容錯

第14課:spark RDD彈性表現和來源,容錯

hadoop 的MapReduce是基於資料集的,位置感知,容錯 負載均衡 
基於資料集的處理:從物理儲存上載入資料,然後操作資料,然後寫入物理儲存裝置; 
基於資料集的操作不適應的場景: 
1,不適合於大量的迭代 
2,互動式查詢 
重點是:基於資料流的方式 不能夠複用曾經的結果或者中間計算結果; 

spark RDD是基於工作集的 
工作流和工作集的共同特點:位置感知,自動容錯,負載均衡等。 
spark的位置感知比hadoop的好很多,具體如下: 

hadoop位置感知:hadoop進行partition之後就不管Reducer在哪裡了。 
spark的位置感知:spark進行partition後再進行下一步Stage時會確定其位置,是更精緻化的。 
RDD:Resillient Distributed Dataset 
RDD的彈性表現: 
1、彈性之一:自動的進行記憶體和磁碟資料儲存的切換; 
2、彈性之二:基於Lineage的高效容錯(第n個節點出錯,會從第n-1個節點恢復,血統容錯); 
3、彈性之三:Task如果失敗會自動進行特定次數的重試(預設4次); 
4、彈性之四:Stage如果失敗會自動進行特定次數的重試(可以只執行計算失敗的階段);只計算失敗的資料分片; 
5、checkpoint和persist 
6、資料排程彈性:DAG TASK 和資源 管理無關 
7、資料分片的高度彈性(人工自由設定分片函式),repartition 


Spark RDD來源:1,使用程式中的集合建立RDD(用於小量測試);  2,使用本地檔案系統建立RDD(測試大資料);  3,使用HDFS建立RDD(生產環境最常用的RDD建立方式);  4,基於DB建立RDD;  5,基於NoSQL,例如HBase;  6,基於S3建立RDD;  7,基於資料流建立RDD; 前三種是比較基本的,後面4種是基於資料庫的,要注意資料本地性(getPreferedLocations); 

1.集合建立RDD方式 的Java程式碼  

  1. package
     com.imf.spark.rdd  
  2.   
  3. import org.apache.spark.{SparkConf, SparkContext}  
  4.   
  5. /** 
  6.   * Created by lujinyong168 on 2016/2/2. 
  7.   * DT大資料夢工廠-IMF 
  8.   * 使用程式中的集合建立RDD(用於小量測試) 
  9.   */  
  10. object RDDCreateByCollections {  
  11. def main(args: Array[String]) {  
  12. val conf = new SparkConf()//建立SparkConf物件  
  13. conf.setAppName("RDDCreateByCollections")//設定應用名稱  
  14. conf.setMaster("local")  
  15. val sc = new SparkContext(conf)//建立SparkContext物件  
  16.     //建立一個Scala集合  
  17. val numbers = 1 to 100  
  18. val rdd = sc.parallelize(numbers)  
  19. //    val rdd = sc.parallelize(numbers,10)//設定並行度為10  
  20. val sum = rdd.reduce(_+_)  
  21. println("1+2+3+...+99+100="+sum)  
  22.   }  
  23. }  


2.local模式建立RDD  的Java程式碼  
  1. package com.imf.spark.rdd  
  2.   
  3. import org.apache.spark.{SparkConf, SparkContext}  
  4.   
  5. /** 
  6.   * Created by lujinyong168 on 2016/2/2. 
  7.   * DT大資料夢工廠-IMF 
  8.   * 使用本地檔案系統建立RDD(測試大量資料) 
  9.   * 統計文字中的字元個數 
  10.   */  
  11. object RDDCreateByLocal {  
  12. def main(args: Array[String]) {  
  13. val conf = new SparkConf()//建立SparkConf物件  
  14. conf.setAppName("RDDCreateByLocal")//設定應用名稱  
  15. conf.setMaster("local")  
  16. val sc = new SparkContext(conf)//建立SparkContext物件  
  17. val rdd = sc.textFile("D://testspark//WordCount.txt")  
  18. val linesLen = rdd.map(line=>line.length)  
  19. val sum = linesLen.reduce(_+_)  
  20. println("The total characters of the file is : "+sum)  
  21.   }  
  22. }  


3.HDFS模式建立RDD  的Java程式碼  
  1. package com.imf.spark.rdd  
  2.   
  3. import org.apache.spark.{SparkConf, SparkContext}  
  4.   
  5. /** 
  6.   * Created by lujinyong168 on 2016/2/2. 
  7.   * DT大資料夢工廠-IMF 
  8.   * 使用HDFS建立RDD(生產環境最常用的RDD建立方式) 
  9.   */  
  10. object RDDCreateByHDFS {  
  11. def main(args: Array[String]) {  
  12. val conf = new SparkConf()//建立SparkConf物件  
  13. conf.setAppName("RDDCreateByHDFS")//設定應用名稱  
  14. conf.setMaster("local")  
  15. val sc = new SparkContext(conf)//建立SparkContext物件  
  16. val rdd = sc.textFile("/library/")  
  17. val linesLen = rdd.map(line=>line.length)  
  18. val sum = linesLen.reduce(_+_)  
  19. println("The total characters of the file is : "+sum)  
  20.   }  
  21. }  

4.對資料進行分片

使用coalesce,不要用repartition

Spark容錯方式

a.資料檢查點和記錄資料的更新

資料檢查點會每次都進行資料的靠背,這個需要很大的記憶體

rdd通過記錄資料的更新的方式為什麼會更高效?

1.rdd是不可變的,且加上lazy

   a.不存在全域性修改的問題,控制難度就會大大下降,在此基礎上有控制鏈條,如果有100個步驟,在第99個不走失敗了,可以直接從第99個步驟上開始恢復

   b.對於rdd的操作寫都是粗粒度的,rdd的讀操作既可以是粗粒度的也可以是細粒度的

   c.lazy級別的進行鏈式展開式,不會產生那個多的中間結果

參考:

https://blog.csdn.net/zdy0_2004/article/details/50132509

https://www.cnblogs.com/dt-zhw/p/5664663.html

https://www.2cto.com/net/201711/697288.html

https://www.csdn.net/article/2015-06-21/2825011