第14課:spark RDD彈性表現和來源,容錯
阿新 • • 發佈:2018-12-14
hadoop 的MapReduce是基於資料集的,位置感知,容錯 負載均衡
基於資料集的處理:從物理儲存上載入資料,然後操作資料,然後寫入物理儲存裝置;
基於資料集的操作不適應的場景:
1,不適合於大量的迭代
2,互動式查詢
重點是:基於資料流的方式 不能夠複用曾經的結果或者中間計算結果;
spark RDD是基於工作集的
工作流和工作集的共同特點:位置感知,自動容錯,負載均衡等。
spark的位置感知比hadoop的好很多,具體如下:
hadoop位置感知:hadoop進行partition之後就不管Reducer在哪裡了。
spark的位置感知:spark進行partition後再進行下一步Stage時會確定其位置,是更精緻化的。
RDD:Resillient Distributed Dataset
RDD的彈性表現:
1、彈性之一:自動的進行記憶體和磁碟資料儲存的切換;
2、彈性之二:基於Lineage的高效容錯(第n個節點出錯,會從第n-1個節點恢復,血統容錯);
3、彈性之三:Task如果失敗會自動進行特定次數的重試(預設4次);
4、彈性之四:Stage如果失敗會自動進行特定次數的重試(可以只執行計算失敗的階段);只計算失敗的資料分片;
5、checkpoint和persist
6、資料排程彈性:DAG TASK 和資源 管理無關
7、資料分片的高度彈性(人工自由設定分片函式),repartition
2.local模式建立RDD 的Java程式碼
3.HDFS模式建立RDD 的Java程式碼
基於資料集的處理:從物理儲存上載入資料,然後操作資料,然後寫入物理儲存裝置;
基於資料集的操作不適應的場景:
1,不適合於大量的迭代
2,互動式查詢
重點是:基於資料流的方式 不能夠複用曾經的結果或者中間計算結果;
spark RDD是基於工作集的
工作流和工作集的共同特點:位置感知,自動容錯,負載均衡等。
spark的位置感知比hadoop的好很多,具體如下:
hadoop位置感知:hadoop進行partition之後就不管Reducer在哪裡了。
spark的位置感知:spark進行partition後再進行下一步Stage時會確定其位置,是更精緻化的。
RDD:Resillient Distributed Dataset
RDD的彈性表現:
1、彈性之一:自動的進行記憶體和磁碟資料儲存的切換;
2、彈性之二:基於Lineage的高效容錯(第n個節點出錯,會從第n-1個節點恢復,血統容錯);
3、彈性之三:Task如果失敗會自動進行特定次數的重試(預設4次);
4、彈性之四:Stage如果失敗會自動進行特定次數的重試(可以只執行計算失敗的階段);只計算失敗的資料分片;
5、checkpoint和persist
6、資料排程彈性:DAG TASK 和資源 管理無關
7、資料分片的高度彈性(人工自由設定分片函式),repartition
Spark RDD來源:1,使用程式中的集合建立RDD(用於小量測試); 2,使用本地檔案系統建立RDD(測試大資料); 3,使用HDFS建立RDD(生產環境最常用的RDD建立方式); 4,基於DB建立RDD; 5,基於NoSQL,例如HBase; 6,基於S3建立RDD; 7,基於資料流建立RDD; 前三種是比較基本的,後面4種是基於資料庫的,要注意資料本地性(getPreferedLocations);
1.集合建立RDD方式 的Java程式碼
- package
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * Created by lujinyong168 on 2016/2/2.
- * DT大資料夢工廠-IMF
- * 使用程式中的集合建立RDD(用於小量測試)
- */
- object RDDCreateByCollections {
- def main(args: Array[String]) {
- val conf = new SparkConf()//建立SparkConf物件
- conf.setAppName("RDDCreateByCollections")//設定應用名稱
- conf.setMaster("local")
- val sc = new SparkContext(conf)//建立SparkContext物件
- //建立一個Scala集合
- val numbers = 1 to 100
- val rdd = sc.parallelize(numbers)
- // val rdd = sc.parallelize(numbers,10)//設定並行度為10
- val sum = rdd.reduce(_+_)
- println("1+2+3+...+99+100="+sum)
- }
- }
2.local模式建立RDD 的Java程式碼
- package com.imf.spark.rdd
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * Created by lujinyong168 on 2016/2/2.
- * DT大資料夢工廠-IMF
- * 使用本地檔案系統建立RDD(測試大量資料)
- * 統計文字中的字元個數
- */
- object RDDCreateByLocal {
- def main(args: Array[String]) {
- val conf = new SparkConf()//建立SparkConf物件
- conf.setAppName("RDDCreateByLocal")//設定應用名稱
- conf.setMaster("local")
- val sc = new SparkContext(conf)//建立SparkContext物件
- val rdd = sc.textFile("D://testspark//WordCount.txt")
- val linesLen = rdd.map(line=>line.length)
- val sum = linesLen.reduce(_+_)
- println("The total characters of the file is : "+sum)
- }
- }
3.HDFS模式建立RDD 的Java程式碼
- package com.imf.spark.rdd
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * Created by lujinyong168 on 2016/2/2.
- * DT大資料夢工廠-IMF
- * 使用HDFS建立RDD(生產環境最常用的RDD建立方式)
- */
- object RDDCreateByHDFS {
- def main(args: Array[String]) {
- val conf = new SparkConf()//建立SparkConf物件
- conf.setAppName("RDDCreateByHDFS")//設定應用名稱
- conf.setMaster("local")
- val sc = new SparkContext(conf)//建立SparkContext物件
- val rdd = sc.textFile("/library/")
- val linesLen = rdd.map(line=>line.length)
- val sum = linesLen.reduce(_+_)
- println("The total characters of the file is : "+sum)
- }
- }
4.對資料進行分片
使用coalesce,不要用repartition
Spark容錯方式
a.資料檢查點和記錄資料的更新
資料檢查點會每次都進行資料的靠背,這個需要很大的記憶體
rdd通過記錄資料的更新的方式為什麼會更高效?
1.rdd是不可變的,且加上lazy
a.不存在全域性修改的問題,控制難度就會大大下降,在此基礎上有控制鏈條,如果有100個步驟,在第99個不走失敗了,可以直接從第99個步驟上開始恢復
b.對於rdd的操作寫都是粗粒度的,rdd的讀操作既可以是粗粒度的也可以是細粒度的
c.lazy級別的進行鏈式展開式,不會產生那個多的中間結果
參考:
https://blog.csdn.net/zdy0_2004/article/details/50132509
https://www.cnblogs.com/dt-zhw/p/5664663.html
https://www.2cto.com/net/201711/697288.html
https://www.csdn.net/article/2015-06-21/2825011