1. 程式人生 > >spark知識點總結(1)

spark知識點總結(1)

1.RDD彈性分散式資料集:是抽象出來的概念,元素的集合。是一批節點上一批資料的集合。

分散式:每個rdd會把資料分成多個parttioner放在多個節點上。eg:90萬條資料放在9個節點上面,每個   節點9萬條資料。

彈性:eg:每個節點上面個的記憶體中只能存放5萬條資料,那麼他會把剩下的4萬條存放在磁碟當中。

RDD提供個高容錯性的機制,當一個RDD上面的資料發生丟失,他會自動的從上一個RDD中去回覆資料。

RDD會從hdfs/hive上面讀取資料,然後存放在不同的節點上面。然後進行迭代式的運算。spark和mapreduce的最大區別就是,maprdeuce只經歷了兩個階段,而spark經歷了N個階段,對資料經行處理。

spark最終會把資料儲存在,1,hdfs/hive上。2,mysql/hbase。3,返回到客戶端

spark框架:1.離線批處理。2,sql查詢。3,實施計算。

2.sparkcontext 是spark程式的入口,根據輸入檔案的型別讀取檔案,讀取本地檔案textFile
Java    flatmap運算元是把RDD一個元素拆成一個或多個多元素
        maptopari 運算元是將每一個元素對映成為(v1,v2)的tuple型別
        reduceByKey運算元是對tule中的第二個值進行累加,在java中沒有reduceBy這個運算元
        
3.spark工作原理
   dirver:提交spqrk程式的節點,包含sparkcontext負責程式與spark叢集進行互動,申請資源,建立RDD等。
   
   master:程序,負責資源的排程和分配,叢集的監控等等。
   worker:程序,兩個職責,一是用自己的記憶體,儲存RDD的某個或某些PARTITION;另外,啟動其他程序和執行緒,對RDD的partition進行並行的計算和處理。
  
   execurtor和task:負責執行,對RDD的partition進行並行的計算,比如:map,flatmap等等。
   
4.建立RDD
   1.並行化集合建立RDD:在實際用途中是為了本地測試
   2.用本地檔案建立RDD:處理大檔案的時候用到
   3.hdfs方式建立RDD:用於生產環境,離線批處理操作,hive   
   
5.spark的共享變數BroadCast,與累加變數
    1.BroadCast是隻讀的,並且在每個節點上只會有一個副本,最主要是減少變數到各個節點的            網路傳輸消耗,以及各個節點的記憶體傳輸消耗。用它裡裡面的value方法會拿到裡面的每一個值
    2.Accumulator累加變數,每個task只能對其進行累加,不能讀。只有dirver才有只讀的許可權。用+=的方式給它賦值。
6.持久化RDD
    為什麼:action運算元在觸發的時候會重複的讀取linrRDD中的值,效率低下。浪費資源,浪費時間。
    
    
    注意:spark在進行shuffle時也會進行持久化操作,防止資料丟失時計算整個過程進行恢復。
    
7.DAGsc
    當提交一個job的時候就會觸發DAG去執行,DAG執行的時候會呼叫taskSC,去執行excutor上面的task,其中DAG裡面有一個非常重要的blockManger記憶體管理的元件
    
8.spark中的shuffle
    時spark效能調優的核心,有兩個特點,1.buket的快取問題。2.shuffleMapTask到resultTast資料拉取問題。
    在spark新版本中,引入了Consolidation機制,對shuffle進行了優化,開啟了並行執行的ShuffleMapTask
    
9.checkpoint時spark中的一個重要的容錯機制。當對一個RDD設定了checkpoint機制他就受RDDcheckpointData物件的管理,RDDCheckpoinData物件,會將對呼叫chaeckpoint方法RDD的狀態設定為MarkedForCheckpoint。會呼叫job中,最後一個RDD的docheckpoin()方法,該方法會沿著finalRdd的lineAge機制向上查詢,把設定為markedCheckpoint的RDD標記為CheckpointinglnProgress,啟動單獨的job來將linage中標記為CheckpointtingProgress的RDD,進行checkpoint操作,也就是將資料寫入SparkContext.setCheckpointDir方法中。

    checkpoin與持久化的區別在於
    1,最主要就是持久化只是將資料儲存在BlockManger中,但是RDD的lineage是不變的,但是執行完checkpoint以後RDD已經沒有了lineage,而是為其強行設定了一個CheckPointRDD,就是說checkpoint之後lineage就發生了改變
    2.持久化的資料通常發生丟失的可能性很大,而checkpoint是儲存在hdfs上很難發生丟失。
    
    3,預設情況下,如果某個RDD沒有持久化,還設定了checkpoint,那是很危險的。本來job執行結束了,但是由於中間的RDD沒有持久化,那麼chenckpoint job想要將RDD的資料寫入外部檔案,還的從RDD之前所有的RDD全部重新計算一次,然後計算出rdd的資料再將其Checkpoint到外部檔案系統。·
    
    
10.stage的劃分機制:在最後一個rdd呼叫action運算元以後DAGscheduler會呼叫runjob方法生成一個finalstage,然後往上找,如果有寬依賴,就會劃分一個stage。DAGscheduler劃分款依賴
    的時候有兩個特別重要的方法submitstage和getMissingParentStages
    
11.job觸發流程


12.spark中的分割槽
    用parallelize讀取資料時如果不指定分割槽機會按預設的叢集環境進行分割槽
    本地模式:按本地cpu的數目進行分割槽,設定了local[N]則預設為N
    apache mesos:預設的分割槽為8
    standlone或yarn,:預設是取叢集中所有核心數目的總和,或者是2,取二者的較大值。對於parallelize來說,沒有在方法中的指定分割槽數,則預設為spark.default.parallelism,對於textFile來說,沒有在方法中的指定分割槽數,則預設為min(defaultParallelism,2)而defaultParallelism對應的就是spark.default.parallelism。如果是從hdfs上面讀取檔案,其分割槽數為檔案分片數(128MB/片)
    
    
    
    1.分割槽
         分割槽是RDD內部平行計算的一個計算單元,RDD的資料集在邏輯上被劃分為多個分片,每一個分片稱為分割槽,分割槽的格式決定了平行計算的粒度,而每個分割槽的數值計算都是在一個任務中進行的,因此任務的個數,也是由RDD(準確來說是作業最後一個RDD)的分割槽數決定。

    2.分割槽的個數
         RDD分割槽的一個分割槽原則:儘可能是得分割槽的個數等於叢集核心數目

        下面我們僅討論Spark預設的分割槽個數,這裡分別就parallelize和textFile具體分析其預設的分割槽數

        無論是本地模式、Standalone模式、YARN模式或Mesos模式,我們都可以通過spark.default.parallelism來配置其預設分割槽個數,若沒有設定該值,則根據不同的叢集環境確定該值

        本地模式:預設為本地機器的CPU數目,若設定了local[N],則預設為N
         Apache Mesos:預設的分割槽數為8
         Standalone或YARN:預設取叢集中所有核心數目的總和,或者2,取二者的較大值
         結論:
         對於parallelize來說,沒有在方法中的指定分割槽數,則預設為spark.default.parallelism
        對於textFile來說,沒有在方法中的指定分割槽數,則預設為min(defaultParallelism,2),而defaultParallelism對應的就是spark.default.parallelism。如果是從hdfs上面讀取檔案,其分割槽數為檔案分片數(128MB/片)