1. 程式人生 > >Spark知識點總結--持續更新

Spark知識點總結--持續更新

spark有哪些元件

(1)master:管理叢集和節點,不參與計算。
(2)worker:計算節點,程序本身不參與計算,和master彙報。
(3)Driver:執行程式的main方法,建立spark context物件。
(4)spark context:控制整個application的生命週期,包括dagsheduler和task scheduler等元件。
(5)client:使用者提交程式的入口。

spark中的模組

Spark Core:

  • 包含Spark基本功能,包括任務排程,記憶體管理,容錯機制等
  • 內部定義了RDDs(彈性分散式資料集)
  • 提供了許多APIs來建立和操作這些RDDs
  • 為其他元件提供底層服務

Spark SQL

  • Spark處理結構化資料的庫,類似Hive SQL

Spark Streaming

  • 提供了API處理實時資料流
  • 企業中用來從Kafka接收資料做實時統計

Mlib

  • 機器學習包
  • 支援叢集上的橫向擴充套件

Graphx

  • 圖處理庫,進行圖的平行計算
  • 提供了常用的圖演算法,eg:PageRank

Cluster Managers

  • 叢集管理,Spark自帶一個叢集管理是單獨排程器
  • 常見的叢集管理包括Hadoop YARN,Apache Mesos

spark工作機制

答:使用者在client端提交作業後,會由Driver執行main方法並建立spark context上下文。
執行add運算元,形成dag圖輸入dagscheduler,按照add之間的依賴關係劃分stage輸入task scheduler。 task scheduler會將stage劃分為task set分發到各個節點的executor中執行。

1. Spark術語

Spark框架圖如下:
在這裡插入圖片描述
Application:使用者編寫的應用程式,使用者自定義的Spark程式,使用者提交後,Spark為App分配資源將程式轉換並執行。
Driver Program

:執行Application的main()函式並且建立SparkContext。
SparkContext:是使用者邏輯與Spark叢集主要的互動介面,它會和Cluster Manager進行互動,進行資源的申請,任務的分配與監控,SparkContext代表Driver。
Worker Node:從節點,叢集中可以執行應用程式的節點,負責控制計算節點,啟動Executor或Driver。在YARN模式中為NodeManager,負責計算節點的控制。
Executor:執行器,是為某Application執行在worker node上的一個程序,負責執行task,該程序裡面會通過執行緒池的方式負責執行任務,並負責將資料存在記憶體或者磁碟上。每個Application擁有獨立的一組executors。
RDD DAG:當RDD遇到Action運算元,將之前的所有運算元形成一個有向無環圖(DAG)。再在Spark中轉化為Job,提交到叢集進行執行。一個App可以包含多個Job。
Task:被Executor執行的工作單元,是執行Application最小的單位,多個task組合成一個stage,Task的排程和管理由TaskScheduler負責,一個分割槽對應一個Task,Task執行RDD中對應Stage中所包含的運算元。Task被封裝好後放入Executor的執行緒池中執行。
Job:一個RDD Graph觸發的作業,往往由Spark Action運算元觸發,在SparkContext中通過runJob()向Spark提交Job。包含多個Task組成的平行計算。
Stage:每個Job的Task被拆分成很多組Task, 作為一個TaskSet,命名為Stage。Stage的排程和劃分由DAGScheduler負責。Stage又分為Shuffle Map Stage和Result Stage兩種。Stage的邊界就在發生Shuffle的地方。
RDD:Spark的基本資料操作抽象,可以通過一系列運算元進行操作。RDD是Spark最核心的東西,可以被分割槽、被序列化、不可變、有容錯機制,並且能並行操作的資料集合。儲存級別可以是記憶體,也可以是磁碟。
DAG Scheduler:根據Job構建基於Stage的DAG(有向無環任務圖),並提交Stage給TaskScheduler。
TaskScheduler:將Task分發給Executor執行。將Stage提交給Worker(叢集)執行,每個Executor執行什麼在此分配。
SparkEnv:執行緒級別的上下文,儲存執行時的重要元件的引用。
共享變數:Application在整個執行過程中,可能需要一些變數在每個Task中都使用,共享變數用於實現該目的。Spark有兩種共享變數:一種快取到各個節點的廣播變數;一種只支援加法操作,實現求和的累加變數。
寬依賴:或稱為ShuffleDependency, 寬依賴需要計算好所有父RDD對應分割槽的資料,然後在節點之間進行Shuffle。
窄依賴:或稱為NarrowDependency,指某個RDD,其分割槽partition x最多被其子RDD的一個分割槽partion y依賴。窄依賴都是Map任務,不需要發生shuffle。因此,窄依賴的Task一般都會被合成在一起,構成一個Stage。

2.工作原理

Spark基本工作原理,這裡我們從巨集觀講解Spark的基本工作原理,幫助你全面瞭解佈局,站在一個高度去理解每個運算元任務的操作原理,才能有效的把握變化中的狀態,通過實際原理圖來說明,來理解程式入口的客戶端、叢集處理流程、讀取資料的來源、最終計算結果何去何從等問題。
在這裡插入圖片描述
1、客戶端:
客戶端也就是專業們常說的Client端,這裡的是表示我們在本地編寫Spark程式,然後必須找一個能夠連線Spark叢集,並提交程式進行執行的機器。

2、讀取資料:
在準備執行Spark程式的同時,是不是也要有資料來源進行處理的呢,這裡我們介紹幾種常見的讀取資料來源,是Hadoop叢集中的HDFS、Hive也有可能是搭建在叢集上的HBase;還有MySQL等DB資料庫;或者是在程式中我們設定的集合資料。
在這裡插入圖片描述

3、Spark分散式叢集:
Spark叢集是一種分散式計算、是一種迭代式計算、是一種基於記憶體計算。
分散式計算,這是Spark最基本的特徵,計算時候資料會分佈存放到各個叢集節點,來並行分散式計算。如圖的第一個操作map,是對於節點1、2、3上面的資料進行map運算元操作,處理後的資料可能會轉移到其他節點的記憶體中,這裡假設到了4、5、6節點,處理後的資料有可能多或是變少,這個需要看我們具體的處理方式。第二個操作reduce,是將map處理後的資料再次進行處理。
這也就得到Spark是一種迭代式計算模型,一次計算邏輯中可以分為N個階段,上一個階段結果資料成為了下一個階段的輸入資料,這樣就不只是像Hadoop中的MapReduce計算一樣了,只有兩個階段map和reduce,就結束一個job任務的執行,得落地到HDFS。而Spark在各個階段計算轉換中一直保持基於記憶體迭代式計算,所以Spark相對於MapReduce來說計算模型可以提供更加強大的計算邏輯功能,從而也大大的提高計算效率。

4、結果資料輸出:
這裡我們介紹幾種輸出方式,基於Hadoop的HDFS、Hive或是HBase;MySQL等DB資料;或是直接輸出返回給客戶端。

Spark工作的一個流程

在這裡插入圖片描述
1、spark-submit 提交了應用程式的時候,提交spark應用的機器會通過反射的方式,建立和構造一個Driver程序,Driver程序執行Application程式,
2、Driver根據sparkConf中的配置初始化SparkContext,在SparkContext初始化的過程中會啟動DAGScheduler和taskScheduler
3、taskSheduler通過後臺程序,向Master註冊Application,Master接到了Application的註冊請求之後,會使用自己的資源排程演算法,在spark叢集的worker上,通知worker為application啟動多個Executor。
4、Executor會向taskScheduler反向註冊。
5、Driver完成SparkContext初始化
6、application程式執行到Action時,就會建立Job。並且由DAGScheduler將Job劃分多個Stage,每個Stage 由TaskSet 組成
7、DAGScheduler將TaskSet提交給taskScheduler
8、taskScheduler把TaskSet中的task依次提交給Executor
9、Executor在接收到task之後,會使用taskRunner來封裝task(TaskRuner主要將我們編寫程式,也就是我們編寫的運算元和函式進行拷貝和反序列化),然後,從Executor的執行緒池中取出一個執行緒來執行task。就這樣Spark的每個Stage被作為TaskSet提交給Executor執行,每個Task對應一個RDD的partition,執行我們的定義的運算元和函式。直到所有操作執行完為止。
在這裡插入圖片描述

Spark執行原理圖

在這裡插入圖片描述

Spark的Shuffle原理及調優?

spark的shuffleManager是負責shuffle過程的執行、計算和處理的元件。shuffleManager是trait,主要實現類有兩個:HashShuffleManager和SortShuffleManager。

val shortShuffleMgrNames =Map(
"hash"->"org.apache.spark.shuffle.hash.HashShuffleManager",
"sort"->"org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort"->"org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager","sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

HashShuffleManager:

在這裡插入圖片描述
shuffle write階段,預設Mapper階段會為Reducer階段的每一個Task單獨建立一個檔案來儲存該Task中要使用的資料。
(1)map task的計算結果,會根據分割槽器(default:HashPartitioner)來決定寫入到哪一個磁碟小檔案中
(2)reduce task 會去map端拉取相應的小檔案
產生磁碟小檔案的個數公式:M(map task的個數)*R(reduce task的個數)

優點:就是操作資料簡單。
缺點:但是在一些情況下(例如資料量非常大的情況)會造成大量檔案(M*R,其中M代表Mapper中的所有的並行任務數量,R代表Reducer中所有的並行任務資料)大資料的隨機磁碟I/O操作且會形成大量的Memory(極易造成OOM)。

磁碟小檔案過多會有什麼問題?
1、在Shuffle write過程會產生很多的寫磁碟的物件
2、在Shuffle read過程會產生很多的讀磁碟的物件
3、在資料傳輸過程中,會有頻繁的網路通訊
在JVM堆記憶體中物件過多會造成頻繁的GC;GC還是無法解決執行所需要的記憶體的話,就會oom;頻繁的網路通訊,會出現通訊故障的可能性大大增加了,一旦網路通訊出現了故障,就會出現如下的錯誤
Shuffle file connot find由於這個錯誤導致的task失敗,那麼TaskScheduler不負責重試,由DAGScheduler負責重試stage

HashShuffleManager產生的問題:

第一:不能夠處理大規模的資料
第二:Spark不能夠執行在大規模的分散式叢集上!

改進方案:Consolidate機制:
spark.shuffle.consolidateFiles 該引數預設值為false,將其設定為true即可開啟優化機制
後來的改善是加入了Consolidate機制來將Shuffle時候產生的檔案數量減少到CR個(C代表在Mapper端,同時能夠使用的cores數量,R代表Reducer中所有的並行任務數量)。但是此時如果Reducer端的並行資料分片過多的話則CR可能已經過大,此時依舊沒有逃脫檔案開啟過多的厄運!!!Consolidate並沒有降低並行度,只是降低了臨時檔案的數量,此時Mapper端的記憶體消耗就會變少,所以OOM也就會降低,另外一方面磁碟的效能也會變得更好。

在這裡插入圖片描述
開啟consolidate機制之後,在shuffle write過程中,task就不是為下游stage的每個task建立一個磁碟檔案了。此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁碟檔案,磁碟檔案的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就可以並行執行多少個task。而第一批並行執行的每個task都會建立一個shuffleFileGroup,並將資料寫入對應的磁碟檔案內。

前提:每個Excutor分配1個cores,假設第二個stage有100個task,第一個stage有50個task,總共還是有10個Executor,每個Executor執行5個task。那麼原本使用未經優化的HashShuffleManager時,每個Executor會產生500個磁碟檔案,所有Executor會產生5000個磁碟檔案的。但是此時經過優化之後,每個Executor建立的磁碟檔案的數量的計算公式為:CPU core的數量 * 下一個stage的task數量。也就是說,每個Executor此時只會建立100個磁碟檔案,所有Executor只會建立1000個磁碟檔案。

當Executor的CPU core執行完一批task,接著執行下一批task時,下一批task就會複用之前已有的shuffleFileGroup,包括其中的磁碟檔案。也就是說,此時task會將資料寫入已有的磁碟檔案中,而不會寫入新的磁碟檔案中。因此,consolidate機制允許不同的task複用同一批磁碟檔案,這樣就可以有效將多個task的磁碟檔案進行一定程度上的合併,從而大幅度減少磁碟檔案的數量,進而提升shuffle write的效能。

SortShuffle

在Mapper中的每一個ShuffleMapTask中產生兩個檔案:Data檔案和Index檔案,其中Data檔案是儲存當前Task的Shuffle輸出的。而index檔案中則儲存了Data檔案中的資料通過Partitioner的分類資訊,此時下一個階段的Stage中的Task就是根據這個Index檔案獲取自己所要抓取的上一個Stage中的ShuffleMapTask產生的資料的,Reducer就是根據index檔案來獲取屬於自己的資料。

涉及問題:Sorted-based Shuffle:會產生 2*M(M代表了Mapper階段中並行的Partition的總數量,其實就是ShuffleMapTask的總數量)個Shuffle臨時檔案。

SortShuffleManager由於有一個磁碟檔案merge的過程,因此大大減少了檔案數量。比如第一個stage有50個task,總共有10個Executor,每個Executor執行5個task,而第二個stage有100個task。由於每個task最終只有一個磁碟檔案,因此此時每個Executor上只有5個磁碟檔案,所有Executor只有50個磁碟檔案。
在這裡插入圖片描述
普通機制Sort-based Shuffle的流程
(1) map task的計算結果會寫入到一個記憶體資料結構裡面,記憶體資料結構預設是5M
(2) 在shuffle的時候會有一個定時器,不定期的去估算這個記憶體資料結構的大小,如果現在記憶體資料結構的大小是5.01M,那麼它會申請5.01*2-5=5.02M記憶體給記憶體資料結構
(3) 如果申請成功,不會進行溢寫
(4) 如果申請不成功,這個時候就會有溢寫的過程
(5) 在溢寫之前,會將記憶體資料結構裡面的資料進行排序,以及分割槽
(6) 然後開始寫磁碟,寫磁碟是以bacth的形式去寫,一個batch是1W條資料
(7) Map task執行完成後,會將這些磁碟小檔案合併成一個大的磁碟檔案,同時生成一個索引
(8) Reduce task去map端拉資料的時候,首先解析索引檔案,根據索引檔案再去拉去屬於它自己的資料
產生磁碟小檔案的公式:2M(M代表了Mapper階段中並行的Partition的總數量,其實就是ShuffleMapTask的總數量)

預設Sort-based Shuffle的幾個缺陷
1)如果Mapper中Task的數量過大,依舊會產生很多小檔案,此時在Shuffle傳遞資料的過程中到Reducer端,reduce會需要同時開啟大量的記錄來進行反序列化,導致大量的記憶體消耗和GC的巨大負擔,造成系統緩慢甚至崩潰!
2)如果需要在分片內也進行排序的話,此時需要進行Mapper端和Reducer端的兩次排序!!!
優化:
可以改造Mapper和Reducer端,改框架來實現一次排序。
在這裡插入圖片描述
bypass執行機制
1、spark.shuffle.sort.bypassMergeThreshold 預設值為200 ,如果shuffle read task的數量小於這個閥值200,則不會進行排序。
2、或者使用hashbasedshuffle + consolidateFiles 機制

上圖說明了bypass SortShuffleManager的原理。bypass執行機制的觸發條件如下:

  1. shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold引數的值。
    這個引數僅適用於SortShuffleManager,如前所述,SortShuffleManager在處理不需要排序的Shuffle操作時,由於排序帶來效能的下降。這個引數決定了在這種情況下,當Reduce分割槽的數量小於多少的時候,在SortShuffleManager內部不使用Merge Sort的方式處理資料,而是與Hash Shuffle類似,直接將分割槽檔案寫入單獨的檔案,不同的是,在最後一步還是會將這些檔案合併成一個單獨的檔案。這樣通過去除Sort步驟來加快處理速度,代價是需要併發開啟多個檔案,所以記憶體消耗量增加,本質上是相對HashShuffleMananger一個折衷方案。這個引數的預設值是200個分割槽,如果記憶體GC問題嚴重,可以降低這個值。

  2. 不是聚合類的shuffle運算元(比如reduceByKey)。
    此時task會為每個下游task都建立一個臨時磁碟檔案,並將資料按key進行hash然後根據key的hash值,將key寫入對應的磁碟檔案之中。當然,寫入磁碟檔案時也是先寫入記憶體緩衝,緩衝寫滿之後再溢寫到磁碟檔案的。最後,同樣會將所有臨時磁碟檔案都合併成一個磁碟檔案,並建立一個單獨的索引檔案。
    該過程的磁碟寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要建立數量驚人的磁碟檔案,只是在最後會做一個磁碟檔案的合併而已。因此少量的最終磁碟檔案,也讓該機制相對未經優化的HashShuffleManager來說,shuffle read的效能會更好。
    而該機制與普通SortShuffleManager執行機制的不同在於:第一,磁碟寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在於,shuffle write過程中,不需要進行資料的排序操作,也就節省掉了這部分的效能開銷。

總結
有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的預設選項,但是Spark 1.2以及之後的版本預設都是SortShuffleManager了。tungsten-sort與sort類似,但是使用了tungsten計劃中的堆外記憶體管理機制,記憶體使用效率更高。tungsten-sort慎用,存在bug.

spark shuffle引數調優

spark.shuffle.file.buffer

  • 預設值:32k
  • 引數說明:該引數用於設定shuffle write task的BufferedOutputStream的buffer緩衝大小。將資料寫到磁碟檔案之前,會先寫入buffer緩衝中,待緩衝寫滿之後,才會溢寫到磁碟。
  • 調優建議:如果作業可用的記憶體資源較為充足的話,可以適當增加這個引數的大小(比如64k),從而減少shuffle write過程中溢寫磁碟檔案的次數,也就可以減少磁碟IO次數,進而提升效能。在實踐中發現,合理調節該引數,效能會有1%~5%的提升。

spark.reducer.maxSizeInFlight

  • 預設值:48m
  • 引數說明:該引數用於設定shuffle read task的buffer緩衝大小,而這個buffer緩衝決定了每次能夠拉取多少資料。
  • 調優建議:如果作業可用的記憶體資源較為充足的話,可以適當增加這個引數的大小(比如96m),從而減少拉取資料的次數,也就可以減少網路傳輸的次數,進而提升效能。在實踐中發現,合理調節該引數,效能會有1%~5%的提升。
  • Reduce task去map拉資料,reduce 一邊拉資料一邊聚合 reduce端有一塊聚合記憶體(executor memory)
  • 解決方法:
    (1)增加reduce聚合的記憶體比例 設定spark.Shuffle.memoryFraction
    (2)增加executor memory的大小 –executor-memory 5G
    (3)減少reduce task每次拉取的資料量 設定
    spark.reducer.maxSizeInFlight 24m

spark.shuffle.io.maxRetries

  • 預設值:3
  • 引數說明:shuffle read task從shuffle write task所在節點拉取屬於自己的資料時,如果因為網路異常導致拉取失敗,是會自動進行重試的。該引數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗。
  • 調優建議:對於那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由於JVM的full gc或者網路不穩定等因素導致的資料拉取失敗。在實踐中發現,對於針對超大資料量(數十億~上百億)的shuffle過程,調節該引數可以大幅度提升穩定性。

spark.shuffle.io.retryWait

  • 預設值:5s
  • 引數說明:具體解釋同上,該引數代表了每次重試拉取資料的等待間隔,預設是5s。
  • 調優建議:建議加大間隔時長(比如60s),以增加shuffle操作的穩定性。

spark.shuffle.memoryFraction

  • 預設值:0.2
  • 引數說明:該引數代表了Executor記憶體中,分配給shuffle read task進行聚合操作的記憶體比例,預設是20%。
  • 調優建議:在資源引數調優中講解過這個引數。如果記憶體充足,而且很少使用持久化操作,建議調高這個比例,給shuffle read的聚合操作更多記憶體,以避免由於記憶體不足導致聚合過程中頻繁讀寫磁碟。在實踐中發現,合理調節該引數可以將效能提升10%左右。

spark.shuffle.manager

  • 預設值:sort
  • 引數說明:該引數用於設定ShuffleManager的型別。Spark 1.5以後,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的預設選項,但是Spark 1.2以及之後的版本預設都是SortShuffleManager了。tungsten-sort與sort類似,但是使用了tungsten計劃中的堆外記憶體管理機制,記憶體使用效率更高。
  • 調優建議:由於SortShuffleManager預設會對資料進行排序,因此如果你的業務邏輯中需要該排序機制的話,則使用預設的SortShuffleManager就可以;而如果你的業務邏輯不需要對資料進行排序,那麼建議參考後面的幾個引數調優,通過bypass機制或優化的HashShuffleManager來避免排序操作,同時提供較好的磁碟讀寫效能。這裡要注意的是,tungsten-sort要慎用,因為之前發現了一些相應的bug。

spark.shuffle.sort.bypassMergeThreshold

  • 預設值:200
  • 引數說明:當ShuffleManager為SortShuffleManager時,如果shuffle read task的數量小於這個閾值(預設是200),則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫資料,但是最後會將每個task產生的所有臨時磁碟檔案都合併成一個檔案,並會建立單獨的索引檔案。
  • 調優建議:當你使用SortShuffleManager時,如果的確不需要排序操作,那麼建議將這個引數調大一些,大於shuffle read task的數量。那麼此時就會自動啟用bypass機制,map-side就不會進行排序了,減少了排序的效能開銷。但是這種方式下,依然會產生大量的磁碟檔案,因此shuffle write效能有待提高。

spark.shuffle.consolidateFiles

  • 預設值:false
  • 引數說明:如果使用HashShuffleManager,該引數有效。如果設定為true,那麼就會開啟consolidate機制,會大幅度合併shuffle write的輸出檔案,對於shuffle read task數量特別多的情況下,這種方法可以極大地減少磁碟IO開銷,提升效能。
  • 調優建議:如果的確不需要SortShuffleManager的排序機制,那麼除了使用bypass機制,還可以嘗試將spark.shffle.manager引數手動指定為hash,使用HashShuffleManager,同時開啟consolidate機制。在實踐中嘗試過,發現其效能比開啟了bypass機制的SortShuffleManager要高出10%~30%。

spark如何保證宕機迅速恢復?

處於Standby狀態的Master在接收到org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent傳送的ElectedLeader訊息後,就開始通過ZK中儲存的Application,Driver和Worker的元資料資訊進行故障恢復了,它的狀態也從RecoveryState.STANDBY變為RecoveryState.RECOVERING了。當然了,如果沒有任何需要恢復的資料,Master的狀態就直接變為RecoveryState.ALIVE,開始對外服務了。
一方面Master通過 恢復Application,Driver和Worker的狀態,
beginRecovery(storedApps, storedDrivers, storedWorkers)
一方面通過在60s後主動向自己傳送CompleteRecovery的訊息,開始恢復資料完成後的操作。

recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,CompleteRecovery)

首先看一下如何通過ZooKeeperLeaderElectionAgent提供的介面恢復資料。
在這裡插入圖片描述
獲取了原來的Master維護的Application,Driver和Worker的列表後,當前的Master通過beginRecovery來恢復它們的狀態。 恢復Application的步驟: 置待恢復的Application的狀態為UNKNOWN,向AppClient傳送MasterChanged的訊息 AppClient收到後改變其儲存的Master的資訊,包括URL和Master actor的資訊,回覆MasterChangeAcknowledged(appId) Master收到後通過appId後將Application的狀態置為WAITING 檢查如果所有的worker和Application的狀態都不是UNKNOWN,那麼恢復結束,呼叫completeRecovery() 恢復Worker的步驟: 重新註冊Worker(實際上是更新Master本地維護的資料結構),置狀態為UNKNOWN 向Worker傳送MasterChanged的訊息 Worker收到訊息後,向Master回覆 訊息WorkerSchedulerStateResponse,並通過該訊息上報executor和driver的資訊。 Master收到訊息後,會置該Worker的狀態為ALIVE,並且會檢查該Worker上報的資訊是否與自己從ZK中獲取的資料一致,包括executor和driver。一致的executor和driver將被恢復。對於Driver,其狀態被置為RUNNING。 檢查如果所有的worker和Application的狀態都不是UNKNOWN,那麼恢復結束,呼叫completeRecovery() beginRecovery的原始碼實現:

  def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
      storedWorkers: Seq[WorkerInfo]) {
    for (app <- storedApps) { // 逐個恢復Application
      logInfo("Trying to recover app: " + app.id)
      try {
        registerApplication(app)
        app.state = ApplicationState.UNKNOWN
        app.driver ! MasterChanged(masterUrl, masterWebUiUrl) //向AppClient傳送Master變化的訊息,AppClient會回覆MasterChangeAcknowledged
      } catch {
        case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
      }
    }
 
    for (driver <- storedDrivers) {
      // Here we just read in the list of drivers. Any drivers associated with now-lost workers
      // will be re-launched when we detect that the worker is missing.
      drivers += driver // 在Worker恢復後,Worker會主動上報執行其上的executors和drivers從而使得Master恢復executor和driver的資訊。
    }
 
    for (worker <- storedWorkers) { //逐個恢復Worker
      logInfo("Trying to recover worker: " + worker.id)
      try {
        registerWorker(worker) //重新註冊Worker
        worker.state = WorkerState.UNKNOWN
        worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) //向Worker傳送Master變化的訊息,Worker會回覆WorkerSchedulerStateResponse
      } catch {
        case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
      }
    }
  }

通過下面的流程圖可以更加清晰的理解這個過程
在這裡插入圖片描述
如何判斷恢復是否結束? 在上面介紹Application和Worker的恢復時,提到了每次收到他們的迴應,都要檢查是否當前所有的Worker和Application的狀態都不為UNKNOWN,如果是,那麼恢復結束,呼叫completeRecovery()。這個機制並不能完全起作用,如果有一個Worker恰好也是宕機了,那麼該Worker的狀態會一直是UNKNOWN,那麼會導致上述策略一直不會起作用。這時候第二個判斷恢復結束的標準就其作用了:超時機制,選擇是設定了60s得超時,在60s後,不管是否有Worker或者AppClient未返回相應,都會強制標記當前的恢復結束。對於那些狀態仍然是UNKNOWN的app和worker,Master會丟棄這些資料。具體實現如下:
在這裡插入圖片描述
但是對於一個擁有幾千個節點的叢集來說,60s設定的是否合理?畢竟現在沒有使用Standalone模式部署幾千個節點的吧?因此硬編碼60s看上去也十分合理,畢竟都是邏輯很簡單的呼叫,如果一些節點60S沒有返回,那麼下線這部分機器也是合理的。 通過設定spark.worker.timeout,可以自定義超時時間。

spark基本工作原理

在這裡插入圖片描述
driver向worker程序提交資源請求,worker會啟動多個executor程序為driver分配資源,executor啟動後會向driver進行反註冊,以便driver知道自己啟動的資源的情況。
driver向executor提交task(map/reduce等),executor啟動多個task執行緒來執行轉換/動作運算元。

Spark on yarn

在這裡插入圖片描述

client執行模式

1、在客戶端通過Spark-submit提交一個Application
2、在客戶端上啟動一個Driver程序
3、 Driver啟動完成後, client會向RS (ResourceManager)傳送請求(給我找一臺NM,我要啟動AM)
4、RS接受到了請求,找到某一臺NM了, Rs會向NM程序傳送一條訊息(給我啟動一個Container容器,我要啟動AM程序)
5、AM已經啟動了, AM會向RS傳送請求(給我一批資源,我要執行Application)
6、RS接受了請求,給他找了一批N回給AM
7、AM會向這一批MM傳送訊息(你給我啟動一個Container,我要啟動Executor)
8、Executor會反向註冊給客戶端裡啟動的Driver程序
9、 Driver就有了一批計算程序(Executor)
10、 Driver就可以傳送task到Executor裡面去執行了。

總結:Applicationmaster作用:
1、 為當前的Application申請資源
2、 給NM傳送訊息;啟動Container(一組計算單位)Executor

Cluster執行模式

client vs cluster

1、client模式Driver在客戶端啟動 測試
2、cluster模式Driver是在yarn叢集中某一臺NM中啟動生產環境
3、ApplicationMaster在不同的模式下作用不一樣:
  ApplicationMaster在client模式下:
    (1)為當前的Application申請資源
    (2)給NM傳送訊息, NM啟動Container(一組計算資源的單位) Executor
  ApplicationMaster在cluster模式下:
    (1)為當前的Application申請資源
    (2)給NM傳送訊息, NM啟動container(一組計算資源的單位)Executor
    (3)任務排程

注意事項:

1、 yarn叢集所在的節點必須有spark的安裝包
2、 Spark跑在Yarn叢集上,不需要啟動Spark standalone叢集,不需要master worker這一些節點
master->RS
Worker->NM
在這裡插入圖片描述

RDD機制

rdd分散式彈性資料集,簡單的理解成一種資料結構,是spark框架上的通用貨幣。
所有運算元都是基於rdd來執行的,不同的場景會有不同的rdd實現類,但是都可以進行互相轉換。
rdd執行過程中會形成dag圖,然後形成lineage保證容錯性等。
從物理的角度來看rdd儲存的是block和node之間的對映。

RDD彈性的分散式資料集五大特性
1、他有一系列的Partition組成的
2、每一個運算元作用在每一個partition上
3、 rdd之間是有依賴關係的
4、可選項:分割槽器作用在KV格式的RDD上
 (1)分割槽器是在shuffle階段起作用
 (2) GroupByKey, reduceByKey, join, sortBykey等這些運算元會產生shuffle
 (3)這些運算元必須作用在KV格式的RDD
5、 RDD會提供一系列最佳計算位置,說白了就是暴露每一個partitior的位置這是資料本地化的基礎
在這裡插入圖片描述

RDD持久化原理

Spark 中一個很重要的能力是將資料持久化(或稱為快取),在多個操作間都可以訪問這些持久化的資料。當持久化一個 RDD 時,每個節點的其它分割槽都可以使用 RDD在記憶體中進行計算,在該資料上的其他 action 操作將直接使用記憶體中的資料。這樣會讓以後的 action 操作計算速度加快(通常執行速度會加速 10倍)。快取是迭代演算法和快速的互動式使用的重要工具。

RDD 可以使用persist() 方法或 cache() 方法進行持久化。資料將會在第一次 action 操作時進行計算,並快取在節點的記憶體中。Spark 的快取具有容錯機制,如果一個快取的 RDD 的某個分割槽丟失了,Spark 將按照原來的計算過程,自動重新計算並進行快取。

另外,每個持久化的 RDD 可以使用不同的儲存級別進行快取,例如,持久化到磁碟、已序列化的 Java 物件形式持久化到記憶體(可以節省空間)、跨節點間複製、以 off-heap 的方式儲存在 Tachyon。這些儲存級別通過傳遞一個 **StorageLevel 物件(Scala、Java、Python)給 persist() 方法進行設定。cache()**方法是使用預設儲存級別的快捷設定方法,預設的儲存級別是 StorageLevel.MEMORY_ONLY(將反序列化的物件儲存到記憶體中)。詳細的儲存級別介紹如下 :

  • MEMORY_ONLY : 將 RDD 以反序列化 Java 物件的形式儲存在 JVM 中。如果記憶體空間不夠,部分資料分割槽將不再快取,在每次需要用到這些資料時重新進行計算。這是預設的級別。
  • MEMORY_AND_DISK : 將 RDD 以反序列化 Java 物件的形式儲存在 JVM 中。如果記憶體空間不夠,將未快取的資料分割槽儲存到磁碟,在需要使用這些分割槽時從磁碟讀取。
  • MEMORY_ONLY_SER : 將 RDD 以序列化的 Java 物件的形式進行儲存(每個分割槽為一個 byte 陣列)。這種方式會沒有序列化物件的方式節省很多空間,尤其是在使用 fast serializer時會節省更多的空間,但是在讀取時會增加 CPU 的計算負擔。
  • MEMORY_AND_DISK_SER : 類似於 MEMORY_ONLY_SER ,但是溢位的分割槽會儲存到磁碟,而不是在用到它們時重新計算。
  • DISK_ONLY : 只在磁碟上快取 RDD。
  • MEMORY_ONLY_2,MEMORY_AND_DISK_2,等等 : 與上面的級別功能相同,只不過每個分割槽在叢集中兩個節點上建立副本。
  • OFF_HEAP(實驗中): 類似於 MEMORY_ONLY_SER ,但是將資料儲存在 off-heap memory,這需要啟動 off-heap 記憶體。

注意,在 Python 中,快取的物件總是使用 Pickle 進行序列化,所以在 Python 中不關心你選擇的是哪一種序列化級別。python 中的儲存級別包括 MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY和 DISK_ONLY_2
在 shuffle 操作中(例如 reduceByKey),即便是使用者沒有呼叫 persist 方法,Spark 也會自動快取部分中間資料。這麼做的目的是,在 shuffle 的過程中某個節點執行失敗時,不需要重新計算所有的輸入資料。如果使用者想多次使用某個 RDD,強烈推薦在該 RDD 上呼叫 persist方法。

如何選擇儲存級別

Spark 的儲存級別的選擇,核心問題是在記憶體使用率和 CPU 效率之間進行權衡。建議按下面的過程進行儲存級別的選擇 :

  • 如果使用預設的儲存級別(MEMORY_ONLY),儲存在記憶體中的 RDD 沒有發生溢位,那麼就選擇預設的儲存級別。預設儲存級別可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度執行。
  • 如果記憶體不能全部儲存 RDD,那麼使用 MEMORY_ONLY_SER,並挑選一個快速序列化庫將物件序列化,以節省記憶體空間。使用這種儲存級別,計算速度仍然很快。
  • 除了在計算該資料集的代價特別高,或者在需要過濾大量資料的情況下,儘量不要將溢位的資料儲存到磁碟。因為,重新計算這個資料分割槽的耗時與從磁碟讀取這些資料的耗時差不多。
  • 如果想快速還原故障,建議使用多副本儲存級別(例如,使用 Spark 作為 web 應用的後臺服務,在服務出故障時需要快速恢復的場景下)。所有的儲存級別都通過重新計算丟失的資料的方式,提供了完全容錯機制。但是多副本級別在發生資料丟失時,不需要重新計算對應的資料庫,可以讓任務繼續執行。

刪除資料

Spark 自動監控各個節點上的快取使用率,並以最近最少使用的方式(LRU)將舊資料塊移除記憶體。如果想手動移除一個 RDD,而不是等待該 RDD 被 Spark自動移除,可以使用 RDD.unpersist()方法。

不使用RDD的持久化
在這裡插入圖片描述
1、預設情況下,對於大量資料的action操作都是非常耗時的。可能一個操作就耗時1個小時;
2、在執行action操作的時候,才會觸發之前的操作的執行,因此在執行第一次count操作時,就會從hdfs中讀取一億資料,形成lines RDD;
3、第一次count操作之後,我們的確獲取到了hdfs檔案的行數。但是lines RDD其實會被丟棄掉,資料也會被新的資料丟失;

所以,如果不用RDD的持久化機制,可能對於相同的RDD的計算需要重複從HDFS源頭獲取資料進行計算,這樣會浪費很多時間成本;

RDD持久化的原理

在這裡插入圖片描述

  1. Spark非常重要的一個功能特性就是可以將RDD持久化在記憶體中。當對RDD執行持久化操作時,每個節點都會將自己操作的RDD的partition持久化到記憶體中,並且在之後對該RDD的反覆使用中,直接使用記憶體快取的partition。這樣的話,對於針對一個RDD反覆執行多個操作的場景,就只要對RDD計算一次即可,後面直接使用該RDD,而不需要反覆計算多次該RDD。
  2. 巧妙使用RDD持久化,甚至在某些場景下,可以將spark應用程式的效能提升10倍。對於迭代式演算法和快速互動式應用來說,RDD持久化,是非常重要的。
  3. 要持久化一個RDD,只要呼叫其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接快取在每個節點中。而且Spark的持久化機制還是自動容錯的,如果持久化的RDD的任何partition丟失了,那麼Spark會自動通過其源RDD,使用transformation操作重新計算該partition
  4. cache()和persist()的區別在於,cache()是persist()的一種簡化方式,cache()的底層就是呼叫的persist()的無參版本,同時就是呼叫persist(MEMORY_ONLY),將資料持久化到記憶體中。如果需要從記憶體中清楚快取,那麼可以使用unpersist()方法。
  5. Spark自己也會在shuffle操作時,進行資料的持久化,比如寫入磁碟,主要是為了在節點失敗時,避免需要重新計算整個過程。

checkpoint檢查點機制

一個Streaming應用程式要求7天24小時不間斷執行,因此必須適應各種導致應用程式失敗的場景。Spark Streaming的檢查點具有容錯機制,有足夠的資訊能夠支援故障恢復。支援兩種資料型別的檢查點:元資料檢查點和資料檢查點。

(1)元資料檢查點,在類似HDFS的容錯儲存上,儲存Streaming計算資訊。這種檢查點用來恢復執行Streaming應用程式失敗的Driver程序。

(2)資料檢查點,在進行跨越多個批次合併資料的有狀態操作時尤其重要。在這種轉換操作情況下,依賴前一批次的RDD生成新的RDD,隨著時間不斷增加,RDD依賴鏈的長度也在增加,為了避免這種無限增加恢復時間的情況,通過週期檢查將轉換RDD的中間狀態進行可靠儲存,藉以切斷無限增加的依賴。使用有狀態的轉換,如果updateStateByKey或者reduceByKeyAndWindow在應用程式中使用,那麼需要提供檢查點路徑,對RDD進行週期性檢查。

元資料檢查點主要用來恢復失敗的Driver程序,而資料檢查點主要用來恢復有狀態的轉換操作。無論是Driver失敗,還是Worker失敗,這種檢查點機制都能快速恢復。許多Spark Streaming都是使用檢查點方式。但是簡單的Streaming應用程式,不包含狀態轉換操作不能執行檢查點;從Driver程式故障中恢復可能會造成一些收到沒有處理的資料丟失。

為了讓一個Spark Streaming程式能夠被恢復,需要啟用檢查點,必須設定一個容錯的、可靠的檔案系統(如HDFS、S3等)路徑儲存檢查點資訊,同時設定時間間隔。

streamingContext.checkpoint(checkpointDirectory)//checkpointDirectory
是一個檔案系統路徑(最好是一個可靠的比如hdfs://…) dstream.checkpoint(checkpointInterval)//設定時間間隔
當程式第一次啟動時,建立一個新的StreamingContext,接著建立所有的資料流,然後再呼叫start()方法。

//定義一個建立並設定StreamingContext的函式
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...)               //建立StreamingContext例項
val DsSream = ssc.socketTextStream(...)      //建立DStream
...
ssc.checkpoint(checkpointDirectory)           //設定檢查點機制
ssc
}
//從檢查點資料重建或者新建一個StreamingContext
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreate-Context_)
//在context需要做額外的設定完成,不考慮是否被啟動或重新啟動
context. ...
//啟動context
context.start()
context.awaitTermination()

通過使用getOrCreate建立StreamingContext。
當程式因為異常重啟時,如果檢查點路徑存在,則context將從檢查點資料中重建。如果檢查點目錄不存在(首次執行),將會呼叫functionToCreateContext函式新建context函式新建context,並設定DStream。

但是,Streaming需要儲存中間資料到容錯儲存系統,這個策略會引入儲存開銷,進而可能會導致相應的批處理時間變長,因此,檢查點的時間間隔需要精心設定。採取小批次時,每批次檢查點可以顯著減少操作的吞吐量;相反,檢查點太少可能會導致每批次任務大小的增加。對於RDD檢查點的有狀態轉換操作,其檢查點間隔預設設定成DStream的滑動間隔的5~10倍。

故障恢復可以使用Spark的Standalone模式自動完成,該模式允許任何Spark應用程式的Driver在叢集內啟動,並在失敗時重啟。而對於YARN或Mesos這樣的部署環境,則必須通過其他的機制重啟Driver。

checkpoint和持久化機制的區別

1.持久化只是將資料儲存在BlockManager中,而RDD的lineage是不變的。但是checkpoint執行完後,RDD已經沒有之前所謂的依賴RDD了,而只有一個強行為其設定的checkpointRDD,RDD的lineage改變了。

2.持久化的資料丟失可能性更大,磁碟、記憶體都可能會存在資料丟失的情況。但是checkpoint的資料通常是儲存在如HDFS等容錯、高可用的檔案系統,資料丟失可能性較小。

注:預設情況下,如果某個RDD沒有持久化,但是設定了checkpoint,會存在問題,本來這個job都執行結束了,但是由於中間RDD沒有持久化,checkpoint job想要將RDD的資料寫入外部檔案系統的話,需要全部重新計算一次,再將計算出來的RDD資料checkpoint到外部檔案系統。所以,建議對checkpoint()的RDD使用persist(StorageLevel.DISK_ONLY),該RDD計算之後,就直接持久化到磁碟上。後面進行checkpoint操作時就可以直接從磁碟上讀取RDD的資料,並checkpoint到外部檔案系統。

Spark streaming以及基本工作原理

接收實時輸入資料流,然後將資料拆分成多個batch,比如每收集1秒的資料封裝為一個batch,然後將每個batch交給Spark的計算引擎進行處理,最後會生產出一個結果資料流,其中的資料,也是由一個一個的batch所組成的。
在這裡插入圖片描述

DStream以及基本工作原理

DStream是Spark Streaming提供的一種高階抽象,英文全稱為Discretized Stream,中文翻譯為離散流,它代表了一個持續不斷的資料流。DStream可以通過輸入資料來源(比如從Flume、Kafka中)來建立,也可以通過對其他DStream應用高階函式(map,flatmap)來建立。

在內部實現上,DStream由一組時間序列上連續的RDD來表示,RDD是Spark Core的核心抽象,即不可變的、分散式的資料集,DStream中的每個RDD都包含了一個時間段內的資料 對DStream應用的運算元,在底層會被轉換為對DStream中每個RDD的操作 底層原理為,對DStream中每個時間段的RDD都應用一遍運算元操作,然後生成新的RDD,即作為新的DStream中的那個時間段的RDD 經過一系列運算元操作之後,最終可以將實時計算的結果儲存到相關介質中,如Redis、HBase、MySQL。

根據這個流程也可以得出Spark Streaming程式設計的步驟:

  1. 建立輸入的資料流Dstream
  2. 對DStream進行各種運算元操作,得到新的DStream
  3. 將處理完的結果儲存到儲存介質中

批處理時間間隔
Spark Streaming中,資料採集是逐條進行的,而資料處理是按批進行的 Spark Streaming中會先設定好批處理的時間間隔。當達到批處理時間間隔的時候就會把採集到的資料彙總起來成為一批資料交給系統去處理

spark核心程式設計原理

1、首先我們搭建好了spark叢集
2、客戶端與spark叢集建立連線,之後才能提交spark應用程式
3、spark提交應用程式到spark叢集上
4、Spark與MapReduce最大的不同在於,迭代式計算模型: MapReduce,分為兩個階段,map和reduce,兩個階段完了,就結束了。所以我們在一個job裡能做的處理很有限,只能在map和reduce裡處理。
Spark,計算模型,可以分為n個階段,因為它是記憶體迭代式的。我們在處理完一個階段以後,可以繼續往下處理很多個階段,而不只是兩個階段。所以,Spark相較於MapReduce來說,計算模型可以提供更強大的功能。

Spark的核心程式設計是什麼?

其實,就是: 首先,
第一,定義初始的RDD,就是說,你要定義第一個RDD是從哪裡,讀取資料,hdfs、linux本地檔案、程式中的集合。
第二,定義對RDD的計算操作,這個在spark裡稱之為運算元,map、reduce、flatMap、groupByKey,比mapreduce提供的map和reduce強大的太多太多了。 第三,其實就是迴圈往復的過程,第一個計算完了以後,資料可能就會到了新的一批節點上,也就是變成一個新的RDD。然後再次反覆,針對新的RDD定義計算操作。。。。
第四,最後,就是獲得最終的資料,將資料儲存起來。 每一批節點上的每一批資料,實際上就是一個RDD!!!一個RDD是分散式的,所以資料都散落在一批節點上了,每個節點都儲存了RDD的部分partition。
在這裡插入圖片描述

RDD的彈性表現在哪幾點

1)自動的進行記憶體和磁碟的儲存切換;
2)基於Linage的高效容錯;
3)task如果失敗會自動進行特定次數的重試;
4)stage如果失敗會自動進行特定次數的重試,而且只會計算失敗的分片;
5)checkpoint和persist,資料計算之後持久化快取
6)資料排程彈性,DAG TASK排程和資源無關
7)資料分片的高度彈性,a.分片很多碎片可以合併成大的,b.par

RDD有哪些缺陷

1)不支援細粒度的寫和更新操作(如網路爬蟲),spark寫資料是粗粒度的
所謂粗粒度,就是批量寫入資料,為了提高效率。但是讀資料是細粒度的也就是說可以一條條的讀
2)不支援增量迭代計算,Flink支援

RDD建立有哪幾種方式

1).使用程式中的集合建立rdd
2).使用本地檔案系統建立rdd
3).使用hdfs建立rdd,
4).基於資料庫db建立rdd
5).基於Nosql建立rdd,如hbase
6).基於s3建立rdd,
7).基於資料流,如socket建立rdd
如果只回答了前面三種,是不夠的,只能說明你的水平還是入門級的,實踐過程中有很多種建立方式。

RDD通過Linage(記錄資料更新)的方式為何很高效

1)lazy記錄了資料的來源,RDD是不可變的,且是lazy級別的,且rDD之間構成了鏈條,lazy是彈性的基石。由於RDD不可變,所以每次操作就產生新的rdd,不存在全域性修改的問題,控制難度下降,所有有計算鏈條將複雜計算鏈條儲存下來,計算的時候從後往前回溯900步是上一個stage的結束,要麼就checkpoint
2)記錄原資料,是每次修改都記錄,代價很大如果修改一個集合,代價就很小,官方說rdd是粗粒度的操作,是為了效率,為了簡化,每次都是操作資料集合,寫或者修改操作,都是基於集合的rdd的寫操作是粗粒度的,rdd的讀操作既可以是粗粒度的也可以是細粒度,讀可以讀其中的一條條的記錄。
3)簡化複雜度,是高效率的一方面,寫的粗粒度限制了使用場景,如網路爬蟲,現實世界中,大多數寫是粗粒度的場景

spark效能優化有哪些

通過spark-env檔案、程式中sparkconf和set property設定。
(1)計算量大,形成的lineage過大應該給已經快取了的rdd新增checkpoint,以減少容錯帶來的開銷。
(2)小分割槽合併,過小的分割槽造成過多的切換任務開銷,使用repartition。

Overview

Spark的瓶頸一般來自於叢集(standalone, yarn, mesos, k8s)的資源緊張,CPU,網路頻寬,記憶體。通過都會將資料序列化,降低其記憶體memory和網路頻寬shuffle的消耗。
Spark的效能,想要它快,就得充分利用好系統資源,尤其是記憶體和CPU:核心思想就是能用記憶體cache就別spill落磁碟,CPU 能並行就別序列,資料能local就別shuffle。

開發調優

  1. 避免建立重複的RDD
     比如多次讀可以persist;但如果input太大,persist可能得不償失
  2. 儘可能複用同一個RDD
     但是如果rdd的lineage太長,最好checkpoint下來,避免長重建
  3. 對多次使用的RDD進行持久化
     持久化級別(SER,MEM,DISK,_N)
  4. 儘量避免使用shuffle類運算元
     shuffle運算元如distinct(實際呼叫reduceByKey)、reduceByKey、aggregateByKey、sortByKey、groupByKey、join、cogroup、repartition等,入參中會有一個並行度引數numPartitions
     shuffle過程中,各個節點上的相同key都會先寫入本地磁碟檔案中,然後其他節點需要通過網路傳輸拉取各個節點上的磁碟檔案中的相同key
  5. 使用map-side預聚合的shuffle操作
     reduceByKey(combiner),groupByKey(沒有combiner)
    在   
 
 </div> 
 <div class=

    相關推薦

    Spark知識點總結--持續更新

    spark有哪些元件 (1)master:管理叢集和節點,不參與計算。 (2)worker:計算節點,程序本身不參與計算,和master彙報。 (3)Driver:執行程式的main方法,建立spark context物件。 (4)spark context:控制整個applicat

    pytorch 知識點總結(持續更新)

    1、argparse的使用 (Python指令碼時傳入引數的三種方式之一:https://blog.csdn.net/u012426298/article/details/80263507)   import argparse#必備 parser = argparse.Argume

    zookeeper知識點總結--持續更新

    Zookeeper有三種執行形式:叢集模式、單機模式、偽叢集模式。 若刪除節點存在子節點,那麼無法刪除該節點,必須先刪除子節點,再刪除父節點。 zookeeper使用分為命令列、javaApi zookeeper的三個jar包jar、javadoc.jar、sources

    自學it18大數據筆記-第三階段Spark-day03——會持續更新……

    大數據 sca png 準備 park tor 技術 spa ges 寫在最前:轉行大數據領域,沒報班,自學試試,能堅持下來以後就好好做這行,不能就……!準備從現有這套it18掌的視屏殘本開始……自學是痛苦的,發博客和大家分享下學習成果——也是監督自己,督促自己堅持學下去。

    自學it18大數據筆記-第三階段Spark-day07——會持續更新……

    color orm sca style .com 活動 更新 交流 資料 寫在最前:轉行大數據領域,沒報班,自學試試,能堅持下來以後就好好做這行,不能就……!準備從現有這套it18掌的視屏殘本開始……自學是痛苦的,發博客和大家分享下學習成果——也是監督自己,督促自己堅持學下

    自學it18大數據筆記-第三階段Spark-day11——會持續更新……

    大數據 cnblogs -1 http 不能 筆記 學習 自己 src 寫在最前:轉行大數據領域,沒報班,自學試試,能堅持下來以後就好好做這行,不能就……!準備從現有這套it18掌的視屏殘本開始……自學是痛苦的,發博客和大家分享下學習成果——也是監督自己,督促自己堅持學下去

    JSON總結-持續更新補充

    system imp port sonar reflect 阿裏 基督 json 類型 基本的json格式 { "name": "jobs", "boolean": true, "age": null, "num": 88 } jso

    C#、Java中的一些小功能點總結(持續更新......)

    grid datagrid item 其他 cnblogs hid roc oid sha 前言:在項目中,有時候一些小的功能點,總是容易讓人忽略,但是這些功能加在項目中往往十分的有用,因此筆者在這裏總結項目中遇到的一些實用的小功能點,以備用,並持續更新...... 1.禁

    Linux-命令-總結-持續更新

    linux 元數據 lis listing rac 系統 查看文件類型 用戶和組 文件系統 tar 打包 cut 截斷 tr 替換translate or delete characters stat 顯示文件和文件系統狀態 file 查看文件類型屬性 last 查看

    多項式的各種運算總結(持續更新)

    還需要 直接 i+1 成了 加法 class spa 進行 mar 多項式的各種運算總結(持續更新)

    軟件體系結構知識點總結更新中)

    綁定 體系 ast taint structure 完整 延遲 維護 公式 軟件體系結構 公式 ? 體系架構=組件+連接件+約束 ? SoftwareArchitecture=Components+Connectors+Constra

    Python 中的那些坑總結——持續更新

    多說 分享 earlier als lse image code while HA 1.三元表達式之坑 很顯然,Python把第一行的(10 + 4)看成了三元表達式的前部分,這個坑是看了《Python cookbook》(P5)中學到的,書中的代碼: 2.Py

    JAVA面試通關知識點必備(持續更新中)

    1.JAVA基本資料型別包括哪些? 基本資料型別包括byte,short,int,long,float,double,boolean,char. 2.String能被繼承嗎?為什麼? java.lang.String類是final型別的,因為不可以繼承這個類,不能修改

    spark知識點總結(1)

    1.RDD彈性分散式資料集:是抽象出來的概念,元素的集合。是一批節點上一批資料的集合。 分散式:每個rdd會把資料分成多個parttioner放在多個節點上。eg:90萬條資料放在9個節點上面,每個   節點9萬條資料。 彈性:eg:每個節點上面個的記憶體中只能存放5萬條資料,那麼

    Linux系統使用小知識點持續更新

    系統的預設語言改為英文 在root使用者下:vim /etc/sysconfig/i18n 進去之後改: LANG=”zh_CN.UTF-8” 為 LANG="en_US.UTF-8" centos系統時間同步 用ntpdate從時間伺服器更新時間:ntpdate

    web前端工程師具備經驗和知識點持續更新中)

    web前端工程師必備 1、瞭解 DNS 解析,充分利用 CDN,使用多個域名來完成資源的請求以縮短載入時間; 2、設定 HTTP Headers(Expires, Cache-Control, If-Modified-Since); 3、遵循 Steve Souders 給出的全部規

    WindowS下的Python環境搭建開發常用總結(持續更新...)

    Mac系統環境搭建開發的連結請參照: Mac系統下的開發環境搭建 1.在dos命令下更換資料夾 切換盤時不用cd,而是先直接進入盤,再一級一級進行切換 通過dir檢視當前目錄下的檔案結構 2.更改py環境為虛擬環境 首先通過命令 pip list檢視當前python版本下

    Ubuntu 16.04 常用工具總結 持續更新

    1.htop命令安裝  當前執行緒情況 apt-get install htop 2.screenfetch命令安裝    檢視系統資訊 apt-get install screenfetch 3.iftop命令安裝  &nbs

    作用域面試總結(持續更新系列~)

    先說幾個概念:    1、js程式碼從上往下執行     2、變數提升:      變數提升是瀏覽器的一個功能,在執行js程式碼之前,瀏覽器會給js一個全域性作用域叫window,window分兩個模組,一個叫記憶體模組,一個叫執行模組,記憶體模組找到當前作用域下的所有帶var和function的關鍵字

    專案中遇到的坑和注意點 總結 持續更新

    gitHub地址: 傳送門 工作中遇到的坑和思考 有不同意見歡迎指正交流 前排推薦 https://github.com/topics/javascript 關注JS開源框架動態 勤於總結和思考 1. ajax請求的結果要和後端約定好返回的資料格式。