1. 程式人生 > >spark RDD官網RDD程式設計指南

spark RDD官網RDD程式設計指南

  • http://spark.apache.org/docs/latest/rdd-programming-guide.html#using-the-shell
  • Overview(概述)
    • 在較高的層次上,每個Spark應用程式都包含一個驅動程式,該程式執行使用者的主要功能並在叢集上執行各種並行操作。 Spark提供的主要抽象是彈性分散式資料集(RDD),它是跨群集節點分割槽的元素集合,可以並行操作。 RDD是通過從Hadoop檔案系統(或任何其他Hadoop支援的檔案系統)中的檔案或驅動程式中的現有Scala集合開始並對其進行轉換來建立的。使用者還可以要求Spark在記憶體中保留RDD,允許它在並行操作中有效地重用。最後,RDD會自動從節點故障中恢復。 Spark中的第二個抽象是可以在並行操作中使用的共享變數。預設情況下,當Spark並行執行一個函式作為不同節點上的一組任務時,它會將函式中使用的每個變數的副本傳送給每個任務。有時,變數需要跨任務共享,或者在任務和驅動程式之間共享。 Spark支援兩種型別的共享變數:廣播變數(可用於在所有節點的記憶體中快取值)和累加變數(僅“新增”到這些變數中),例如計數器和和。 本指南以Spark支援的每種語言顯示了這些功能。如果你啟動Spark的互動式shell,最簡單的方法是使用它 - 用於Scala shell的bin / spark-shell或用於Python的bin / pyspark
  • 與Spark連結
    • 此外,如果希望訪問HDFS叢集,需要為HDFS版本增加hadoop-client的依賴性。
      • groupId = org.apache.hadoop
      • artifactId = hadoop-client
      • version = <your-hdfs-version>
    • 表示。最後,您需要將一些Spark類匯入到您的程式中。新增以下程式碼行:
    • import org.apache.spark.SparkContext org.apache.spark.SparkConf
  • 在Spark 1.3.0之前,您需要顯式地匯入org.apache.spark.SparkContext._啟用基本隱式轉換。)
  • Initializing Spark(spark的初始化)
    • Spark程式必須做的第一件事是建立一個SparkContext物件,它告訴Spark如何訪問叢集。要建立SparkContext,首先需要構建一個包含有關應用程式資訊的SparkConf物件。 每個JVM只能啟用一個SparkContext。您必須在建立新的SparkContext之前停止()活動的SparkContext。
      • val conf = new SparkConf().setAppName(appName).setMaster(master)
      • new SparkContext(conf)
      • appName引數是應用程式在叢集UI上顯示的名稱。 master是Spark,Mesos或YARN群集URL,或者是以本地模式執行的特殊“本地”字串。實際上,當在群集上執行時,您不希望在程式中對master進行硬編碼,而是使用spark-submit啟動應用程式並在那裡接收它。但是,對於本地測試和單元測試,您可以傳遞“local”來執行Spark in-process。
    • Spark圍繞彈性分散式資料集(RDD)的概念展開,RDD是可並行操作的容錯元素集合。建立RDDs有兩種方法:在驅動程式中並行化現有集合,或在外部儲存系統中引用資料集,例如共享檔案系統、HDFS、HBase或任何提供Hadoop InputFormat的資料來源。
      • scala> val distFile = sc.textFile("data.txt")
      • distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
      • val data = Array(1, 2, 3, 4, 5)
      • val distData = sc.parallelize(data)
  • Using the Shell(使用shell)
    • 在Spark shell中,已經在名為sc的變數中為您建立了一個特殊的直譯器感知SparkContext。製作自己的SparkContext將無法正常工作。您可以使用--master引數設定上下文連線到哪個主伺服器,並且可以通過將逗號分隔的列表傳遞給--jars引數來將JAR新增到類路徑中。您還可以通過向--packages引數提供以逗號分隔的Maven座標列表,將依賴項(例如Spark包)新增到shell會話中。可能存在依賴關係的任何其他儲存庫(例如Sonatype)可以傳遞給--repositories引數。例如,要在四個核心上
      • $ ./bin/spark-shell --master local[4]
    • 或者,要將code.jar新增到其類路徑,請使用:
      • $ ./bin/spark-shell --master local[4] --jars code.jar
    • 要使用Maven座標包含依賴項:
      • $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
    • 有關選項的完整列表,請執行spark-shell --help。在幕後,spark-shell呼叫更普遍
  • 彈性分散式資料集(RDD)Resilient Distributed Datasets
    • Spark圍繞彈性分散式資料集(RDD)的概念展開,RDD是可並行操作的容錯元素集合。建立RDDs有兩種方法:在驅動程式中並行化現有集合,或在外部儲存系統中引用資料集,例如共享檔案系統、HDFS、HBase或任何提供Hadoop InputFormat的資料來源。
    • Parallelized Collections(並行化集合)
      • 通過在驅動程式(Scala Seq)中的現有集合上呼叫SparkContext的parallelize方法來建立並行化集合。複製集合的元素以形成可以並行操作的分散式資料集。例如,以下是如何建立包含數字1到5的並行化集合:
      • val data = Array(1, 2, 3, 4, 5)
      • val distData = sc.parallelize(data)
      • 一旦建立,分散式資料集(distData)可以並行操作。例如,我們可以呼叫distData。reduce((a, b) => a + b)用於將陣列的元素相加。稍後我們將描述對分散式資料集的操作。
      • 並行集合的一個重要引數是要將資料集切割成的分割槽數。Spark將為叢集的每個分割槽執行一個任務。通常,您需要為叢集中的每個CPU分配2-4個分割槽。通常,Spark嘗試根據叢集自動設定分割槽數量。但是,您也可以通過passi手動設定它(例如sc.parallelize(data,10))。注意:程式碼中的某些位置使用術語切片(分割槽的同義詞)來保持向後相容性。
    • External Datasets(外部資料集合)
      • Spark可以從Hadoop支援的任何儲存源建立分散式資料集,包括本地檔案系統,HDFS,Cassandra,HBase,Amazon S3等.Spark支援文字檔案,SequenceFiles和任何其他Hadoop InputFormat。 可以使用SparkContext的textFile方法建立文字檔案RDD。此方法獲取檔案的URI(計算機上的本地路徑,或hdfs://,s3a://等URI)並將其作為行集合讀取。這是一個示例呼叫:
      • scala> val distFile = sc.textFile("data.txt")
      • distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
    • 一旦建立,資料集操作就可以對distFile進行操作。例如,我們可以使用對映將所有行的大小相加,並按如下方式減少操作:distFile.map(s = > s.length)。
    • 關於使用Spark讀取檔案的一些注意事項:
      • 如果使用本地檔案系統上的路徑,檔案也必須在工作節點上的同一路徑上可訪問。要麼將檔案複製到所有工作者,要麼使用一個掛載網路的共享檔案系統。
      • 所有Spark基於檔案的輸入方法,包括文字檔案,支援在目錄、壓縮檔案和萬用字元上執行。例如,可以使用textFile(“/my/directory”)、textFile(“/my/directory/*.txt”)和textFile(“/my/directory/*.gz”)。
      • textFile方法還接受一個可選的第二個引數,用於控制檔案的分割槽數量。預設情況下,Spark為檔案的每個塊建立一個分割槽(HDFS中的塊預設為128MB),但是您也可以通過傳遞一個較大的值來請求更多的分割槽。注意,分割槽不能少於塊。
    • 除了文字檔案,Spark s Scala API還支援其他幾種資料格式:
      • SparkContext。wholeTextFiles允許您讀取包含多個小文字檔案的目錄,並將它們作為(檔名、內容)對返回。這與textFile相反,textFile將在每個檔案中每行返回一條記錄。分割槽由資料區域性性決定,在某些情況下,資料區域性性可能導致分割槽太少。對於這些情況,wholetextfile提供了控制最小分割槽數量的第二個可選引數。
      • 對於SequenceFiles,使用SparkContext的sequenceFile[K, V]方法,其中K和V是檔案中的鍵和值的型別。這些應該是Hadoop可寫介面的子類,比如IntWritable和Text。
      • 此外,Spark還允許為一些常見的可寫程式指定本機型別;例如,sequenceFile[Int, String]將自動讀取IntWritables和text。對於其他Hadoop inputformat,您可以使用SparkContext。hadoopRDD方法,它接受任意的JobConf和輸入格式類、鍵類和值類。將這些設定為與使用輸入源的Hadoop作業相同的方式。您還可以使用SparkContext。基於"new"MapReduce API (org.apache.hadoop.mapreduce)的inputformat的newAPIHadoopRDD。
      • RDD.saveAsObjectFile 和SparkContext.objectFile支援以由序列化的Java物件組成的簡單格式儲存RDD。雖然這不如Avro這樣的專用格式高效,但它提供了一種簡單的方式來儲存任何RDD。
  • RDD操作(RDD Operations)
    • RDD支援兩種型別的操作:轉換(從現有資料集建立新資料集)和操作(在資料集上執行計算後將值返回到驅動程式)。例如,map是一個轉換,它通過一個函式傳遞每個資料集元素,並返回一個表示結果的新RDD。另一方面,reduce是一個使用某個函式聚合RDD的所有元素的操作,並將最終結果返回給驅動程式(儘管還有一個返回分散式資料集的並行reduceByKey)。
    • Spark中的所有轉換都是懶惰的,因為它們不會立即計算結果。相反,他們只記得應用於某些基礎資料集(例如檔案)的轉換。僅當操作需要將結果返回到驅動程式時才會計算轉換。這種設計使Spark能夠更有效地執行。例如,我們可以意識到通過map建立的資料集將用於reduce,並僅將reduce的結果返回給驅動程式,而不是更大的對映資料集。
    • 預設情況下,每次對其執行操作時,都可以重新計算每個轉換後的RDD。但是,您也可以使用持久化(persist)(或快取(chche))方法在記憶體中保留RDD,在這種情況下,Spark會在群集上保留元素,以便在下次查詢時更快地訪問。還支援在磁碟上保留RDD,或在多個節點之間複製。
  • Basics(基礎)
    • 為了說明RDD的基礎知識,請考慮下面的簡單程式:
      • val lines = sc.textFile("data.txt")
      • val lineLengths = lines.map(s => s.length)
      • val totalLength = lineLengths.reduce((a, b) => a + b)
    • 第一行從外部檔案定義基礎RDD。此資料集未載入到記憶體中或以其他方式操作:行僅僅是指向檔案的指標。第二行將lineLengths定義為地圖轉換的結果。同樣,由於懶惰,lineLengths不會立即計算。最後,我們執行reduce,這是一個動作。此時,Spark將計算分解為在不同機器上執行的任務,並且每臺機器都執行其部分對映和本地縮減,僅返回其對驅動程式的答案。 
    • 如果我們以後還想使用linelength,我們可以新增:
      • lineLengths.persist()
    • 在reduce之前,這將導致lineLengths在第一次計算之後儲存在記憶體中。
  • 向Spark傳遞函式(Passing Functions to Spark)
    • Spark的API在很大程度上依賴於在驅動程式中傳遞函式以在叢集上執行。有兩種建議的方法可以做到這一點:     
      • 匿名函式語法,可用於短片程式碼。     
      • 全域性單例物件中的靜態方法。例如,您可以定義物件MyFunctions,然後傳遞MyFunctions.func1,如下所示:
      • object MyFunctions {
      • def func1(s: String): String = { ... }
      • }
      • myRdd.map(MyFunctions.func1)
    • 請注意,雖然也可以將引用傳遞給類例項中的方法(而不是單例物件),但這需要傳送包含該類的物件以及方法。例如,考慮:
      • class MyClass {
      • def func1(s: String): String = { ... }
      • def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) } }
      • 在這裡,如果我們建立一個新的MyClass例項並在其上呼叫doStuff,其中的map引用了MyClass例項的func1方法,因此整個物件需要被髮送到叢集。這類似於編寫rdd.map(x = > this.func1(x))
      • 以類似的方式,訪問外部物件的欄位將引用整個物件:
        • class MyClass {
        • val field = "Hello"
        • def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) } }
      • 相當於編寫rdd.map(x => this.field + x),它引用了所有這些。要避免此問題,最簡單的方法是將欄位複製到本地變數而不是從外部訪問它:
        • def doStuff(rdd: RDD[String]): RDD[String] = {
        • val field_ = this.field
        • rdd.map(x => field_ + x) }
  • Understanding closures(瞭解閉包)
    • Spark的一個難點是在跨叢集執行程式碼時理解變數和方法的範圍和生命週期。修改其範圍之外的變數的RDD操作可能經常引起混淆。在下面的示例中,我們將檢視使用foreach()遞增計數器的程式碼,但其他操作也可能出現類似問題。
      • 考慮下面的native RDD元素總和,根據執行是否在同一JVM中發生,它可能表現不同。一個常見的例子是在本地模式下執行Spark(--master = local [n])而不是將Spark應用程式部署到叢集(例如通過spark-submit to YARN):
        • var counter = 0
        • var rdd = sc.parallelize(data)
        • // Wrong: Don't do this!! 錯誤示例:不要這樣做!
        • rdd.foreach(x => counter += x)
        • println("Counter value: " + counter)
  • 本地與群集模式(Local vs. cluster modes)
    • 上述程式碼的行為是未定義的,並且可能不按預期工作。為了執行jobs,Spark將RDD操作的處理分解為tasks,每個task由executor執行。在執行之前,Spark會計算 任務的閉合。閉包是執行器在RDD上執行計算時必須可見的那些變數和方法(在本例中是foreach())。這個閉包被序列化併發送給每個執行程式。
    • 傳送到每個執行程式的閉包中的變數現在都是副本,因此,在foreach函式中引用計數器時,它不再是驅動節點上的計數器。驅動節點的記憶體中仍然有一個計數器,但是執行程式不再可見!執行器只看到序列化閉包中的副本。因此,計數器的最終值仍然為零,因為計數器上的所有操作都引用了序列化閉包中的值。
    • 在本地模式中,在某些情況下,foreach函式實際上將在與驅動程式相同的JVM中執行,並引用相同的原始計數器,並可能實際更新它。
    • 為了確保這些場景中定義良好的行為,應該使用累加器。Spark中的累加器專門用於在叢集中的工作節點之間分割執行時安全地更新變數。本指南的累加器部分更詳細地討論了這些問題。
    • 一般來說,閉包——像迴圈或區域性定義的方法這樣的結構,不應該用來改變某些全域性狀態。Spark不定義或保證從閉包外部引用物件的突變行為。一些這樣做的程式碼可能在本地模式下工作,但這只是偶然的,這樣的程式碼在分散式模式下不會像預期的那樣工作。如果需要一些全域性聚合,可以使用累加器。
  • 列印RDD的元素
    • 另一個常見的習慣用法是嘗試使用RDD .foreach(println)或RDD .map(println)列印RDD的元素。在一臺機器上,這將生成預期的輸出並列印所有RDD s元素。然而,在叢集模式下,執行程式呼叫的stdout的輸出現在寫到了執行程式的stdout,而不是驅動程式上的,所以驅動程式上的stdout不會顯示這些!要列印驅動程式上的所有元素,可以使用collect()方法首先將RDD帶到驅動節點,例如:RDD .collect().foreach(println)。但是,這可能導致驅動程式記憶體不足,因為collect()將整個RDD提取到一臺機器上;如果只需要列印RDD的一些元素,更安全的方法是使用take(): RDD .take(100).foreach(println)。
  • 使用鍵值對
    • 雖然大多數Spark操作都是在包含任何型別物件的RDDs上進行的,但是有一些特殊操作只在鍵-值對的RDDs上可用。最常見的是分散式的“shuffle”操作,例如通過鍵來對元素進行grouping(分組)或聚合(aggregate)。
    • 在Scala中,這些操作在包含Tuple2物件的RDDs上自動可用(語言中的內建元組,通過簡單的編寫(a, b))。鍵-值對操作在PairRDDFunctions類中是可用的,它會自動包裝元組的RDD。
    • 例如,下面的程式碼使用鍵-值對上的reduceByKey操作來計算檔案中每行文字發生多少次:
      • val lines = sc.textFile("data.txt")
      • val pairs = lines.map(s => (s, 1))
      • val counts = pairs.reduceByKey((a, b) => a + b)
    • 我們也可以使用counts.sortByKey()來按字母順序對這些對進行排序,最後使用counts.collect()將它們作為物件陣列返回到驅動程式。
    • 在鍵值對操作中使用自定義物件作為鍵時,必須確保自定義equals()方法伴隨著匹配的hashCode()方法。有關完整的詳細資訊,請參閱Object.hashCode()文件中概述的協議
  • Shuffle操作
    • Spark中的某些操作會觸發一個稱為shuffle的事件。shuffle是Spark的一種重新分配資料的機制,這樣資料就可以在不同分割槽之間進行分組。這通常涉及到在執行程式和機器之間複製資料,使shuffle成為一個複雜且昂貴的操作。
    • 為了理解在shuffle期間發生的事情,我們可以考慮reduceByKey操作的示例。 reduceByKey操作生成一個新的RDD,其中單個鍵的所有值都組合成一個元組 - 鍵和對與該鍵關聯的所有值執行reduce函式的結果。挑戰在於,並非單個金鑰的所有值都必須位於同一個分割槽,甚至是同一個機器上,但它們必須位於同一位置才能計算結果。
    • 在Spark中,資料通常不會跨分割槽分佈,以便在特定操作的必要位置分佈。在計算過程中,單個任務將在單個分割槽上操作——因此,為了組織要執行的單個reduceByKey reduce任務的所有資料,Spark需要執行all-to-all操作。它必須從所有分割槽中讀取,以找到所有鍵的所有值,然後將跨分割槽的值組合在一起,以計算每個鍵的最終結果——這稱為shuffle。
    • 雖然新打亂的資料的每個分割槽中的元素集合是確定的,分割槽本身的排序也是確定的,但是這些元素的排序是不確定的。如果你想在shuffle之後得到可預測的有序資料,那麼就可以使用:
      • 例如,使用mapPartitions對每個分割槽進行排序, .sorted
      • repartitionAndsSortWithinPartition用於高效地對分割槽進行排序,同時重新分割槽
      • sortBy來建立一個全域性排序的RDD
    • 可能導致shuffle的操作包括重分割槽操作(比如重分割槽(repartition)和合並(coalesce))、ByKey操作(除了計數)(比如groupByKey和reduceByKey)以及join操作(比如cogroup和join)。
    • 效能的影響
      • Shuffle是一種昂貴的操作,因為它涉及到磁碟I/O、資料序列化和網路I/O。要組織shuffle的資料,Spark生成一組任務——對映任務來組織資料,以及一組reduce任務來聚合資料。這個術語來自MapReduce,與Spark的map和reduce操作沒有直接關係。
      • 在內部,單個map任務的結果會儲存在記憶體中,直到無法匹配為止。然後,根據目標分割槽對它們進行排序,並將它們寫入單個檔案。在reduce方面,任務讀取相關的排序塊
      • 某些shuffle操作可能會消耗大量的堆記憶體,因為它們使用記憶體中的資料結構來組織在傳輸記錄之前或之後的記錄。具體來說,reduceByKey和aggregateByKey在map端建立這些結構,ByKey操作在reduce端生成這些結構。當資料不適合記憶體時,Spark會將這些表洩漏到磁碟上,從而增加磁碟I/O的額外開銷,並增加垃圾收集。
      • Shuffle還會在磁碟上生成大量的中間檔案。在Spark 1.3中,這些檔案被儲存,直到相應的RDDs不再使用並被垃圾收集。這樣做是為了在重新計算沿襲時不需要重新建立shuffle檔案。如果應用程式保留對這些RDDs的引用,或者GC不頻繁啟動,那麼垃圾收集可能在很長一段時間之後才會發生。這意味著長時間執行的Spark作業可能會消耗大量磁碟空間。臨時儲存目錄由spark.local指定。配置Spark上下文時的dir配置引數。
      • 可以通過調整各種配置引數來調整Shuffle行為。參見Spark配置指南中的Shuffle行為部分。
  • RDD的永續性
    • Spark最重要的功能之一是跨操作在記憶體中持久化(或快取)資料集。當您持久化一個RDD時,每個節點將它在記憶體中計算的任何分割槽儲存起來,並在該資料集(或從該資料集派生的資料集)上的其他操作中重用它們。這使得未來的動作更快(通常超過10倍)。快取是迭代演算法和快速互動使用的關鍵工具。
    • 您可以使用persist()或cache()方法將RDD標記為持久化。第一次在操作中計算它時,它將儲存在節點上的記憶體中。Spark快取是容錯的,如果RDD的任何分割槽丟失,它將使用最初建立它的轉換自動重新計算。
    • 此外,每個持久化的RDD都可以使用不同的儲存級別進行儲存,例如,允許您在磁碟上持久化資料集,在記憶體中持久化資料集,但是作為序列化的Java物件(以節省空間),跨節點複製資料集。這些級別是通過傳遞一個Storage Level物件(Scala、Java、Python)來持久()來設定的。cache()方法是使用預設儲存級別StorageLevel的簡寫方式。MEMORY_ONLY(在記憶體中儲存反序列化的物件)。完整的儲存級別是
      • MEMORY_ONLY
        • 將RDD儲存為JVM中的反序列化Java物件。如果RDD不適合記憶體,一些分割槽將不會被快取,並且將在每次需要時動態地重新計算。這是預設級別。
      • MEMORY_AND_DISK
        • 將RDD儲存為JVM中的反序列化Java物件。如果RDD不適合記憶體,則將不適合的分割槽儲存在磁碟上,並在需要時從那裡讀取它們。
      • MEMORY_ONLY_SER (Java和Scala)
        • 將RDD儲存為序列化的Java物件(每個分割槽一個位元組陣列)。這通常比反序列化的物件更節省空間,特別是在使用快速序列化器時,但是讀起來更密集。
      • MEMORY_AND_DISK_SER (Java和Scala)
        • 與MEMORY_ONLY_SER類似,但是將不適合記憶體的分割槽溢位到磁碟,而不是在每次需要它們時動態地重新計算它們。
      • DISK_ONLY
        • 僅將RDD分割槽儲存在磁碟上。
      • MEMORY_ONLY_2, MEMORY_AND_DISK_2等等。
        • 與上面的級別相同,但是在兩個叢集節點上覆制每個分割槽。
      • OFF_HEAP(實驗性的)
        • 類似於MEMORY_ONLY_SER,但將資料儲存在堆外記憶體中。這需要啟用堆外記憶體。
    • 注意:在Python中,儲存的物件總是使用Pickle庫進行序列化,因此選擇序列化級別並不重要。Python中可用的儲存級別包括MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY和DISK_ONLY_2。
    • Spark還會在shuffle操作中自動持久化一些中間資料(例如reduceByKey),甚至在沒有使用者呼叫persist的情況下也是如此。這樣做是為了避免在轉移期間節點失敗時重新計算整個輸入。如果使用者打算重用生成的RDD,我們仍然建議他們呼叫持久化。
    • 選擇哪個儲存級別?
      • Spark儲存級別旨在提供記憶體使用和CPU效率之間的不同權衡。我們建議通過下面的過程來選擇一個:
        • 如果您的RDDs與預設儲存級別(MEMORY_ONLY)非常匹配,那麼就讓它這樣吧。這是最有效的cpu選項,允許RDDs上的操作以儘可能快的速度執行。
        • 如果沒有,請嘗試使用MEMORY_ONLY_SER並選擇一個快速序列化庫,以使物件更節省空間,但訪問速度仍然相當快。(Java和Scala)
        • 不要洩漏到磁碟上,除非計算資料集的函式很昂貴,或者它們過濾了大量資料。否則,重新計算分割槽可能與從磁碟讀取分割槽一樣快。
        • 如果需要快速的故障恢復(例如,如果使用Spark服務於web應用程式的請求),則使用複製儲存級別。通過重新計算丟失的資料,所有儲存級別都提供了完全的容錯能力,但是複製的儲存級別允許您在RDD上繼續執行任務,而無需等待重新計算丟失的分割槽。
  • 刪除資料
    • Spark自動監視每個節點上的快取使用情況,並以最近最少使用的方式(LRU)刪除舊資料分割槽。如果您想手動刪除一個RDD,而不是等待它從快取中退出,請使用RDD.unpersist()方法。
  • 共享變數
    • 通常,當傳遞給Spark操作(如map或reduce)的函式在遠端叢集節點上執行時,它在函式中使用的所有變數的單獨副本上工作。這些變數被複制到每臺機器上,對遠端機器上的變數的更新不會傳播回驅動程式。跨任務支援通用的讀寫共享變數將是低效的。但是,Spark確實為兩種常見的使用模式提供了兩種有限型別的共享變數:廣播變數和累加器。
    • Broadcast (廣播)變數
      • 允許程式設計師將只讀變數快取在每臺機器上,而不是將它的副本與任務一起傳送出去。例如,可以使用它們以高效的方式為每個節點提供一個大型輸入資料集的副本。
      • Spark還嘗試使用高效的廣播演算法來分配廣播變數,以降低通訊成本。
      • Spark操作通過一組 stages執行,這些 stages由分散式shuffle操作分開。Spark自動廣播tasks在每個 stages所需的公共資料。以這種方式廣播的資料在執行每個task之前以序列化的形式快取和反序列化。這意味著,只有當跨多個stages的task需要相同的資料或以反序列化的形式快取資料時,顯式地建立廣播變數才有用。Broadcast變數是通過呼叫SparkContext.broadcast(v)從變數v中建立的。broadcast變數是v的包裝器,它的值可以通過呼叫value方法來訪問。下面的程式碼顯示了這一點
        • scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
        • broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
        • scala> broadcastVar.value
        • res0: Array[Int] = Array(1, 2, 3)
      • 建立廣播變數後,應該在群集上執行的任何函式中使用它而不是值v,這樣v不會多次傳送到節點。另外,在廣播之後不應修改物件v,以確保所有節點獲得廣播變數的相同值(例如,如果稍後將變數傳送到新節點)。
    • Accumulators累加器
      • 累加器是僅通過關聯和交換操作“新增”的變數,因此可以並行地有效支援。它們可用於實現計數器(如MapReduce)或總和。 Spark本身支援數值型別的累加器,程式設計師可以新增對新型別的支援。
      • 作為使用者,您可以建立命名或未命名的累加器。如下圖所示,命名累加器(在此例項計數器中)將在Web UI中顯示修改該累加器的stage。 Spark顯示“tasks”表中任務修改的每個累加器的值。
      • 在UI中跟蹤累加器對於理解執行stage的進展很有用(注意:Python中還不支援這種方法)。
      • 可以通過呼叫SparkContext.longAccumulator()或SparkContext.doubleAccumulator()來分別累積Long或Double型別的值來建立一個數值累加器。然後,在叢集上執行tasks可以使用add方法新增到叢集中。然而,他們無法讀懂它的值。只有驅動程式可以使用累加器的value方法讀取累加器的值。
      • 下面的程式碼顯示了一個累加器,用於將陣列的元素相加:
        • scala> val accum = sc.longAccumulator("My Accumulator")
        • accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
        • scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
        • ...
        • 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
        • scala> accum.value
        • res2: Long = 10
      • 雖然這段程式碼使用了內建的Long型別累加器支援,但程式設計師也可以通過子類化累加v2來建立自己的型別。AccumulatorV2抽象類有幾個方法,您必須重寫它們:重置累加器為零,新增為向累加器新增另一個值,合併為將另一個相同型別的累加器合併到這個累加器。必須重寫的其他方法包含在API文件中。例如,假設我們有一個表示數學向量的MyVector類,我們可以這樣寫:
      • class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
      • private val myVector: MyVector = MyVector.createZeroVector
      • def reset(): Unit = {
      • myVector.reset()
      • }
      • def add(v: MyVector): Unit = {
      • myVector.add(v) }
      • ...
      • }
      • //然後,建立這種型別的累加器:
      • val myVectorAcc = new VectorAccumulatorV2
      • //然後,將其註冊到spark context:
      • sc.register(myVectorAcc, "MyVectorAcc1")
    • 注意,當程式設計師定義自己的 累加器v2型別時,得到的型別可能與新增的元素不同。
    • 對於只在內部操作中執行的累加器更新,Spark保證每個task's對累加器的更新只應用一次,即重新啟動的tasks不會更新值。在轉換中,使用者應該意識到,如果tasks或stages被重新執行,每個task的更新可能會被多次應用。
    • 累加器不會改變Spark的惰性評估模型。如果在RDD上的操作中更新它們,則只有在RDD作為操作的一部分計算時才更新它們的值。因此,在像map()這樣的惰性轉換中進行累積器更新時,不能保證執行累加器更新。以下程式碼片段演示了此屬性:
      • val accum = sc.longAccumulator
      • data.map { x => accum.add(x); x }
      • // 在這裡,accum仍然是0,因為沒有操作導致map操作被計算。
  • 單元測試
    • Spark對任何流行的單元測試框架進行單元測試都很友好。只需在測試中建立一個SparkContext,將主URL設定為local,執行您的操作,然後呼叫SparkContext.stop()將其拆除。確保在finally塊或測試框架的tearDown方法中停止上下文(context),因為Spark不支援在同一程式中同時執行的兩個上下文(context)。
  • 部署到叢集
    • 應用程式提交指南描述瞭如何向叢集提交應用程式。簡而言之,一旦您將應用程式打包到JAR(對於Java/Scala)或.py或.zip檔案(對於Python)中,bin/spark-submit指令碼允許您將其提交給任何受支援的叢集管理器。
  • Tuning Spark調優指南
  • Apache Spark示例
  • 叢集模式
  • 部署到群集
  • 從Java / Scala啟動Spark作業