1. 程式人生 > >Spark中文手冊-程式設計指南

Spark中文手冊-程式設計指南

概論 在高層中,每個 Spark 應用程式都由一個驅動程式(driver programe)構成,驅動程式在叢集上執行使用者的mian 函式來執行各種各樣的並行操作(parallel operations)。Spark 的主要抽象是提供一個彈性分散式資料集(RDD),RDD 是指能橫跨叢集所有節點進行平行計算的分割槽元素集合。
RDDs 從 Hadoop 的檔案系統中的一個檔案中建立而來(或其他 Hadoop 支援的檔案系統),或者從一個已有的 Scala 集合轉換得到。使用者可以要求 Spark 將 RDD 持久化(persist)到記憶體中,來讓它在平行計算中高效地重用。最後,RDDs 能在節點失敗中自動地恢復過來。

Spark 的第二個抽象是共享變數(shared variables),共享變數能被執行在平行計算中。預設情況下,當 Spark 執行一個並行函式時,這個並行函式會作為一個任務集在不同的節點上執行,它會把函式裡使用的每個變數都複製搬運到每個任務中。有時,一個變數需要被共享到交叉任務中或驅動程式和任務之間。Spark 支援 2 種類型的共享變數:廣播變數(broadcast variables),用來在所有節點的記憶體中快取一個值;累加器(accumulators),僅僅只能執行“新增(added)”操作,例如:記數器(counters)和求和(sums)。
這個指南會在 Spark 支援的所有語言中演示它的每一個特徵。非常簡單地開始一個 Spark 互動式 shell -bin/spark-shell 開始一個 Scala shell,或 bin/pyspark 開始一個 Python shell。

  • 引入 Spark
  • 初始化 Spark
  • Spark RDDs
  • 共享變數
  • 從這裡開始

1. 引入 Spark
Spark 1.2.0 使用 Scala 2.10 寫應用程式,你需要使用一個相容的 Scala 版本(例如:2.10.X)。 寫 Spark 應用程式時,你需要新增 Spark 的 Maven 依賴,Spark 可以通過 Maven 中心倉庫來獲得:
  • groupId = org.apache.spark
  • artifactId = spark-core_2.10
  • version = 1.2.0

[color=rgb(51, 102, 153) !important]複製程式碼
另外,如果你希望訪問 HDFS 叢集,你需要根據你的 HDFS 版本新增 hadoop-client 的依賴。一些公共的 HDFS 版本 tags 在第三方發行頁面中被列出。

  • groupId = org.apache.hadoop
  • artifactId = hadoop-client
  • version = <your-hdfs-version>

[color=rgb(51, 102, 153) !important]複製程式碼
最後,你需要匯入一些 Spark 的類和隱式轉換到你的程式,新增下面的行就可以了:
  • import org.apache.spark.SparkContext
  • import org.apache.spark.SparkContext._
  • import org.apache.spark.SparkConf



2. 初始化 Spark

Spark 程式設計的第一步是需要建立一個 SparkContext 物件,用來告訴 Spark 如何訪問叢集。在建立SparkContext 之前,你需要構建一個 SparkConf 物件, SparkConf 物件包含了一些你應用程式的資訊。
  • val conf = new SparkConf().setAppName(appName).setMaster(master)
  • new SparkContext(conf)


appName 引數是你程式的名字,它會顯示在 cluster UI 上。master 是 Spark, Mesos 或 YARN 叢集的 URL,或執行在本地模式時,使用專用字串 “local”。在實踐中,當應用程式執行在一個叢集上時,你並不想要把 master 硬編碼到你的程式中,你可以用 spark-submit 啟動你的應用程式的時候傳遞它。然而,你可以在本地測試和單元測試中使用 “local” 執行 Spark 程序。

2.1 使用 Shell
在 Spark shell 中,有一個專有的 SparkContext 已經為你建立好。在變數中叫做 sc。你自己建立的 SparkContext 將無法工作。可以用 --master 引數來設定 SparkContext 要連線的叢集,用 --jars 來設定需要新增到 classpath 中的 JAR 包,如果有多個 JAR 包使用逗號分割符連線它們。例如:在一個擁有 4 核的環境上執行 bin/spark-shell,使用:
  • $ ./bin/spark-shell --master local[4]

[color=rgb(51, 102, 153) !important]複製程式碼
或在 classpath 中新增 code.jar,使用:
  • $ ./bin/spark-shell --master local[4] --jars code.jar

[color=rgb(51, 102, 153) !important]複製程式碼
執行 spark-shell --help 獲取完整的選項列表。在這之後,呼叫 spark-shell 會比 spark-submit 指令碼更為普遍。
3. 彈性分散式資料集 (RDDs) Spark 核心的概念是 Resilient Distributed Dataset (RDD):一個可並行操作的有容錯機制的資料集合。有 2 種方式建立 RDDs:第一種是在你的驅動程式中並行化一個已經存在的集合;另外一種是引用一個外部儲存系統的資料集,例如共享的檔案系統,HDFS,HBase或其他 Hadoop 資料格式的資料來源。
  • 並行集合
  • 外部資料集
  • RDD 操作
  • 傳遞函式到 Spark
  • 使用鍵值對
  • Transformations
  • Actions
  • RDD持久化

3.1 並行集合
並行集合 (Parallelized collections) 的建立是通過在一個已有的集合(Scala Seq)上呼叫 SparkContext 的parallelize 方法實現的。集合中的元素被複制到一個可並行操作的分散式資料集中。例如,這裡演示瞭如何在一個包含 1 到 5 的陣列中建立並行集合:
  • val data = Array(1, 2, 3, 4, 5)
  • val distData = sc.parallelize(data)

[color=rgb(51, 102, 153) !important]複製程式碼
一旦建立完成,這個分散式資料集(distData)就可以被並行操作。例如,我們可以呼叫distData.reduce((a, b) => a + b) 將這個陣列中的元素相加。我們以後再描述在分散式上的一些操作。
並行集合一個很重要的引數是切片數(slices),表示一個數據集切分的份數。Spark 會在叢集上為每一個切片執行一個任務。你可以在叢集上為每個 CPU 設定 2-4 個切片(slices)。正常情況下,Spark 會試著基於你的叢集狀況自動地設定切片的數目。然而,你也可以通過 parallelize 的第二個引數手動地設定(例如:sc.parallelize(data, 10))。
3.2 外部資料集 Spark 可以從任何一個 Hadoop 支援的儲存源建立分散式資料集,包括你的本地檔案系統,HDFS,Cassandra,HBase,Amazon S3等。 Spark 支援文字檔案(text files),SequenceFiles 和其他 HadoopInputFormat。 文字檔案 RDDs 可以使用 SparkContext 的 textFile 方法建立。 在這個方法裡傳入檔案的 URI (機器上的本地路徑或 hdfs://,s3n:// 等),然後它會將檔案讀取成一個行集合。這裡是一個呼叫例子:
[color=rgb(51, 102, 153) !important]複製程式碼
一旦建立完成,distFiile 就能做資料集操作。例如,我們可以用下面的方式使用 map 和 reduce 操作將所有行的長度相加:distFile.map(s => s.length).reduce((a, b) => a + b)。
注意,Spark 讀檔案時:
  • 如果使用本地檔案系統路徑,檔案必須能在 work 節點上用相同的路徑訪問到。要麼複製檔案到所有的 workers,要麼使用網路的方式共享檔案系統。
  • 所有 Spark 的基於檔案的方法,包括 textFile,能很好地支援檔案目錄,壓縮過的檔案和萬用字元。例如,你可以使用 textFile("/my/檔案目錄"),textFile("/my/檔案目錄/*.txt") 和 textFile("/my/檔案目錄/*.gz")。
  • textFile 方法也可以選擇第二個可選引數來控制切片(slices)的數目。預設情況下,Spark 為每一個檔案塊(HDFS 預設檔案塊大小是 64M)建立一個切片(slice)。但是你也可以通過一個更大的值來設定一個更高的切片數目。注意,你不能設定一個小於檔案塊數目的切片值。

除了文字檔案,Spark 的 Scala API 支援其他幾種資料格式:
  • SparkContext.sholeTextFiles 讓你讀取一個包含多個小文字檔案的檔案目錄並且返回每一個(filename, content)對。與 textFile 的差異是:它記錄的是每個檔案中的每一行。
  • 對於 SequenceFiles,可以使用 SparkContext 的 sequenceFile[K, V] 方法建立,K 和 V 分別對應的是 key 和 values 的型別。像 IntWritable 與 Text 一樣,它們必須是 Hadoop 的 Writable 介面的子類。另外,對於幾種通用的 Writables,Spark 允許你指定原聲型別來替代。例如: sequenceFile[Int, String] 將會自動讀取 IntWritables 和 Text。
  • 對於其他的 Hadoop InputFormats,你可以使用 SparkContext.hadoopRDD 方法,它可以指定任意的JobConf,輸入格式(InputFormat),key 型別,values 型別。你可以跟設定 Hadoop job 一樣的方法設定輸入源。你還可以在新的 MapReduce 介面(org.apache.hadoop.mapreduce)基礎上使用SparkContext.newAPIHadoopRDD(譯者注:老的介面是 SparkContext.newHadoopRDD)。
  • RDD.saveAsObjectFile 和 SparkContext.objectFile 支援儲存一個RDD,儲存格式是一個簡單的 Java 物件序列化格式。這是一種效率不高的專有格式,如 Avro,它提供了簡單的方法來儲存任何一個 RDD。

3.3 RDD 操作 RDDs 支援 2 種類型的操作:轉換(transformations) 從已經存在的資料集中建立一個新的資料集;動作(actions) 在資料集上進行計算之後返回一個值到驅動程式。例如,map 是一個轉換操作,它將每一個數據集元素傳遞給一個函式並且返回一個新的 RDD。另一方面,reduce 是一個動作,它使用相同的函式來聚合 RDD 的所有元素,並且將最終的結果返回到驅動程式(不過也有一個並行 reduceByKey 能返回一個分散式資料集)。
在 Spark 中,所有的轉換(transformations)都是惰性(lazy)的,它們不會馬上計算它們的結果。相反的,它們僅僅記錄轉換操作是應用到哪些基礎資料集(例如一個檔案)上的。轉換僅僅在這個時候計算:當動作(action) 需要一個結果返回給驅動程式的時候。這個設計能夠讓 Spark 執行得更加高效。例如,我們可以實現:通過map 建立一個新資料集在 reduce 中使用,並且僅僅返回 reduce 的結果給 driver,而不是整個大的對映過的資料集。
預設情況下,每一個轉換過的 RDD 會在每次執行動作(action)的時候重新計算一次。然而,你也可以使用persist (或 cache)方法持久化(persist)一個 RDD 到記憶體中。在這個情況下,Spark 會在叢集上儲存相關的元素,在你下次查詢的時候會變得更快。在這裡也同樣支援持久化 RDD 到磁碟,或在多個節點間複製。
基礎 為了說明 RDD 基本知識,考慮下面的簡單程式:
  • val lines = sc.textFile("data.txt")
  • val lineLengths = lines.map(s => s.length)
  • val totalLength = lineLengths.reduce((a, b) => a + b)


第一行是定義來自於外部檔案的 RDD。這個資料集並沒有載入到記憶體或做其他的操作:lines 僅僅是一個指向檔案的指標。第二行是定義 lineLengths,它是 map 轉換(transformation)的結果。同樣,lineLengths由於懶惰模式也沒有立即計算。最後,我們執行 reduce,它是一個動作(action)。在這個地方,Spark 把計算分成多個任務(task),並且讓它們執行在多個機器上。每臺機器都執行自己的 map 部分和本地 reduce 部分。然後僅僅將結果返回給驅動程式。
如果我們想要再次使用 lineLengths,我們可以新增:
  • lineLengths.persist()

[color=rgb(51, 102, 153) !important]複製程式碼
在 reduce 之前,它會導致 lineLengths 在第一次計算完成之後儲存到記憶體中。
3.3.1 傳遞函式到 Spark Spark 的 API 很大程度上依靠在驅動程式裡傳遞函式到叢集上執行。這裡有兩種推薦的方式:
  • 匿名函式 (Anonymous function syntax),可以在比較短的程式碼中使用。
  • 全域性單例物件裡的靜態方法。例如,你可以定義 object MyFunctions 然後傳遞 MyFounctions.func1,像下面這樣:
  • object MyFunctions {
  •   def func1(s: String): String = { ... }
  • }
  • myRdd.map(MyFunctions.func1)

[color=rgb(51, 102, 153) !important]複製程式碼
注意,它可能傳遞的是一個類例項裡的一個方法引用(而不是一個單例物件),這裡必須傳送包含方法的整個物件。例如:
  • class MyClass {
  •   def func1(s: String): String = { ... }
  •   def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
  • }

[color=rgb(51, 102, 153) !important]複製程式碼
這裡,如果我們建立了一個 new MyClass 物件,並且呼叫它的 doStuff,map 裡面引用了這個 MyClass例項中的 func1 方法,所以這個物件必須傳送到叢集上。類似寫成 rdd.map(x => this.func1(x))。
以類似的方式,訪問外部物件的欄位將會引用整個物件:
  • class MyClass {
  •   val field = "Hello"
  •   def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
  • }

[color=rgb(51, 102, 153) !important]複製程式碼
相當於寫成 rdd.map(x => this.field + x),引用了整個 this 物件。為了避免這個問題,最簡單的方式是複製 field 到一個本地變數而不是從外部訪問它:
  • def doStuff(rdd: RDD[String]): RDD[String] = {
  •   val field_ = this.field
  •   rdd.map(x => field_ + x)
  • }

[color=rgb(51, 102, 153) !important]複製程式碼
3.3.2 使用鍵值對 雖然很多 Spark 操作工作在包含任意型別物件的 RDDs 上的,但是少數幾個特殊操作僅僅在鍵值(key-value)對 RDDs 上可用。最常見的是分散式 "shuffle" 操作,例如根據一個 key 對一組資料進行分組和聚合。 在 Scala 中,這些操作在包含二元組(Tuple2)(在語言的內建元組中,通過簡單的寫 (a, b) 建立) 的 RDD 上自動地變成可用的,只要在你的程式中匯入 org.apache.spark.SparkContext._ 來啟用 Spark 的隱式轉換。在 PairRDDFunctions 的類裡鍵值對操作是可以使用的,如果你匯入隱式轉換它會自動地包裝成元組 RDD。
例如,下面的程式碼在鍵值對上使用 reduceByKey 操作來統計在一個檔案裡每一行文字內容出現的次數:
  • val lines = sc.textFile("data.txt")
  • val pairs = lines.map(s => (s, 1))
  • val counts = pairs.reduceByKey((a, b) => a + b)

[color=rgb(51, 102, 153) !important]複製程式碼
我們也可以使用 counts.sortByKey(),例如,將鍵值對按照字母進行排序,最後 counts.collect() 把它們作為一個物件陣列帶回到驅動程式。 注意:當使用一個自定義物件作為 key 在使用鍵值對操作的時候,你需要確保自定義 equals() 方法和hashCode() 方法是匹配的。更加詳細的內容,檢視 Object.hashCode() 文件)中的契約概述。
3.3.3 Transformations 下面的表格列了 Sparkk 支援的一些常用 transformations。詳細內容請參閱 RDD API 文件(Scala, Java,Python) 和 PairRDDFunctions 文件(Scala, Java)。
Transformation Meaning
map(func) 返回一個新的分散式資料集,將資料來源的每一個元素傳遞給函式 func對映組成。
filter(func) 返回一個新的資料集,從資料來源中選中一些元素通過函式 func 返回 true。
flatMap(func) 類似於 map,但是每個輸入項能被對映成多個輸出項(所以 func 必須返回一個 Seq,而不是單個 item)。
mapPartitions(func) 類似於 map,但是分別執行在 RDD 的每個分割槽上,所以 func 的型別必須是 Iterator<T> => Iterator<U> 當執行在型別為 T 的 RDD 上。
mapPartitionsWithIndex(func) 類似於 mapPartitions,但是 func 需要提供一個 integer 值描述索引(index),所以 func 的型別必須是 (Int, Iterator) => Iterator 當執行在型別為 T 的 RDD 上。
sample(withReplacement, fraction, seed) 對資料進行取樣。
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported through leftOuterJoin and rightOuterJoin.
cogroup(otherDataset, [numTasks]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable, Iterable) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

3.3.4 Actions 下面的表格列了 Sparkk 支援的一些常用 actions。詳細內容請參閱 RDD API 文件(Scala, Java, Python) 和 PairRDDFunctions 文件(Scala, Java)。
Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
takeSample(withReplacement, num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path) (Java and Scala) Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path) (Java and Scala) Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.

3.4 RDD 持久化 Spark最重要的一個功能是它可以通過各種操作(operations)持久化(或者快取)一個集合到記憶體中。當你持久化一個RDD的時候,每一個節點都將參與計算的所有分割槽資料儲存到記憶體中,並且這些 資料可以被這個集合(以及這個集合衍生的其他集合)的動作(action)重複利用。這個能力使後續的動作速度更快(通常快10倍以上)。對應迭代演算法和快速的互動使用來說,快取是一個關鍵的工具。
你能通過persist()或者cache()方法持久化一個rdd。首先,在action中計算得到rdd;然後,將其儲存在每個節點的記憶體中。Spark的快取是一個容錯的技術-如果RDD的任何一個分割槽丟失,它 可以通過原有的轉換(transformations)操作自動的重複計算並且創建出這個分割槽。
此外,我們可以利用不同的儲存級別儲存每一個被持久化的RDD。例如,它允許我們持久化集合到磁碟上、將集合作為序列化的Java物件持久化到記憶體中、在節點間複製集合或者儲存集合到 Tachyon中。我們可以通過傳遞一個StorageLevel物件給persist()方法設定這些儲存級別。cache()方法使用了預設的儲存級別—StorageLevel.MEMORY_ONLY。完整的儲存級別介紹如下所示:
Storage Level Meaning
MEMORY_ONLY 將RDD作為非序列化的Java物件儲存在jvm中。如果RDD不適合存在記憶體中,一些分割槽將不會被快取,從而在每次需要這些分割槽時都需重新計算它們。這是系統預設的儲存級別。
MEMORY_AND_DISK 將RDD作為非序列化的Java物件儲存在jvm中。如果RDD不適合存在記憶體中,將這些不適合存在記憶體中的分割槽儲存在磁碟中,每次需要時讀出它們。
MEMORY_ONLY_SER 將RDD作為序列化的Java物件儲存(每個分割槽一個byte陣列)。這種方式比非序列化方式更節省空間,特別是用到快速的序列化工具時,但是會更耗費cpu資源—密集的讀操作。
MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER類似,但不是在每次需要時重複計算這些不適合儲存到記憶體中的分割槽,而是將這些分割槽儲存到磁碟中。
DISK_ONLY 僅僅將RDD分割槽儲存到磁碟中
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上面的儲存級別類似,但是複製每個分割槽到叢集的兩個節點上面
OFF_HEAP (experimental) 以序列化的格式儲存RDD到Tachyon中。相對於MEMORY_ONLY_SER,OFF_HEAP減少了垃圾回收的花費,允許更小的執行者共享記憶體池。這使其在擁有大量記憶體的環境下或者多併發應用程式的環境中具有更強的吸引力。
NOTE:在python中,儲存的物件都是通過Pickle庫序列化了的,所以是否選擇序列化等級並不重要。
Spark也會自動持久化一些shuffle操作(如reduceByKey)中的中間資料,即使使用者沒有呼叫persist方法。這樣的好處是避免了在shuffle出錯情況下,需要重複計算整個輸入。如果使用者計劃重用 計算過程中產生的RDD,我們仍然推薦使用者呼叫persist方法。
如何選擇儲存級別 Spark的多個儲存級別意味著在記憶體利用率和cpu利用效率間的不同權衡。我們推薦通過下面的過程選擇一個合適的儲存級別:
  • 如果你的RDD適合預設的儲存級別(MEMORY_ONLY),就選擇預設的儲存級別。因為這是cpu利用率最高的選項,會使RDD上的操作儘可能的快。
  • 如果不適合用預設的級別,選擇MEMORY_ONLY_SER。選擇一個更快的序列化庫提高物件的空間使用率,但是仍能夠相當快的訪問。
  • 除非函式計算RDD的花費較大或者它們需要過濾大量的資料,不要將RDD儲存到磁碟上,否則,重複計算一個分割槽就會和重磁碟上讀取資料一樣慢。
  • 如果你希望更快的錯誤恢復,可以利用重複(replicated)儲存級別。所有的儲存級別都可以通過重複計算丟失的資料來支援完整的容錯,但是重複的資料能夠使你在RDD上繼續執行任務,而不需要重複計算丟失的資料。
  • 在擁有大量記憶體的環境中或者多應用程式的環境中,OFF_HEAP具有如下優勢:
    • 它執行多個執行者共享Tachyon中相同的記憶體池
    • 它顯著地減少垃圾回收的花費
    • 如果單個的執行者崩潰,快取的資料不會丟失

刪除資料 Spark自動的監控每個節點快取的使用情況,利用最近最少使用原則刪除老舊的資料。如果你想手動的刪除RDD,可以使用RDD.unpersist()方法
4. 共享變數
一般情況下,當一個傳遞給Spark操作(例如map和reduce)的函式在遠端節點上面執行時,Spark操作實際上操作的是這個函式所用變數的一個獨立副本。這些變數被複制到每臺機器上,並且這些變數在遠端機器上 的所有更新都不會傳遞迴驅動程式。通常跨任務的讀寫變數是低效的,但是,Spark還是為兩種常見的使用模式提供了兩種有限的共享變數:廣播變數(broadcast variable)和累加器(accumulator)
廣播變數
廣播變數允許程式設計師快取一個只讀的變數在每臺機器上面,而不是每個任務儲存一份拷貝。例如,利用廣播變數,我們能夠以一種更有效率的方式將一個大資料量輸入集合的副本分配給每個節點。(Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.)Spark也嘗試著利用有效的廣播演算法去分配廣播變數,以減少通訊的成本。
一個廣播變數可以通過呼叫SparkContext.broadcast(v)方法從一個初始變數v中建立。廣播變數是v的一個包裝變數,它的值可以通過value方法訪問,下面的程式碼說明了這個過程:
  • scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
  • broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
  • scala> broadcastVar.value
  • res0: Array[Int] = Array(1, 2, 3)


廣播變數建立以後,我們就能夠在叢集的任何函式中使用它來代替變數v,這樣我們就不需要再次傳遞變數v到每個節點上。另外,為了保證所有的節點得到廣播變數具有相同的值,物件v不能在廣播之後被修改。
累加器 顧名思義,累加器是一種只能通過關聯操作進行“加”操作的變數,因此它能夠高效的應用於並行操作中。它們能夠用來實現counters和sums。Spark原生支援數值型別的累加器,開發者可以自己新增支援的型別。 如果建立了一個具名的累加器,它可以在spark的UI中顯示。這對於理解執行階段(running stages)的過程有很重要的作用。(注意:這在python中還不被支援)
一個累加器可以通過呼叫SparkContext.accumulator(v)方法從一個初始變數v中建立。執行在叢集上的任務可以通過add方法或者使用+=操作來給它加值。然而,它們無法讀取這個值。只有驅動程式可以使用value方法來讀取累加器的值。 如下的程式碼,展示瞭如何利用累加器將一個數組裡面的所有元素相加:
  • scala> val accum = sc.accumulator(0, "My Accumulator")
  • accum: spark.Accumulator[Int] = 0
  • scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
  • ...
  • 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
  • scala> accum.value
  • res2: Int = 10


這個例子利用了內建的整數型別累加器。開發者可以利用子類AccumulatorParam建立自己的 累加器型別。AccumulatorParam介面有兩個方法:zero方法為你的資料型別提供一個“0 值”(zero value);addInPlace方法計算兩個值的和。例如,假設我們有一個Vector類代表數學上的向量,我們能夠 如下定義累加器:
  • object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  •   def zero(initialValue: Vector): Vector = {
  •     Vector.zeros(initialValue.size)
  •   }
  •   def addInPlace(v1: Vector, v2: Vector): Vector = {
  •     v1 += v2
  •   }
  • }
  • // Then, create an Accumulator of this type:
  • val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)


在scala中,Spark支援用更一般的Accumulable介面來累積資料-結果型別和用於累加的元素型別 不一樣(例如通過收集的元素建立一個列表)。Spark也支援用SparkContext.accumulableCollection方法累加一般的scala集合型別。
5. 從這裡開始
你能夠從spark官方網站檢視一些spark執行例子。另外,Spark的example目錄包含幾個Spark例子,你能夠通過如下方式執行Java或者scala例子:
  • ./bin/run-example SparkPi


為了優化你的專案, configuration和tuning指南提高了最佳 實踐的資訊。保證你儲存在記憶體中的資料是有效的格式是非常重要的事情。為了給部署操作提高幫助,叢集模式概述介紹了 包含分散式操作和支援叢集管理的元件。
最後,完整的API文件可以在後面連結scala,java, python中檢視。

相關推薦

Spark中文手冊-程式設計指南

概論 在高層中,每個 Spark 應用程式都由一個驅動程式(driver programe)構成,驅動程式在叢集上執行使用者的mian 函式來執行各種各樣的並行操作(parallel operations)。Spark 的主要抽象是提供一個彈性分散式資料集(RDD),RD

轉載:Spark中文指南(入門篇)-Spark程式設計模型(一)

原文:https://www.cnblogs.com/miqi1992/p/5621268.html 前言   本章將對Spark做一個簡單的介紹,更多教程請參考: Spark教程 本章知識點概括 Apache Spark簡介 Spark的四種執行模式 Spark基於

springcloud教程,springcloud指南,springcloud中文手冊,springcloud項目實戰,springcloud源碼分享

sdn img fonts div 部署 j2ee mark 交流 klist Spring Cloud是一系列框架的有序集合。它利用Spring Boot的開發便利性巧妙地簡化了分布式系統基礎設施的開發,如服務發現註冊、配置中心、消息總線、負載均衡、斷路器、數據監控等,都

spark RDD官網RDD程式設計指南

http://spark.apache.org/docs/latest/rdd-programming-guide.html#using-the-shell Overview(概述) 在較高的層次上,每個Spark應用程式都包含一個驅動程式,該程式執行使用者的主要功能並在叢集上執行各

Spark官方文件》Spark Streaming程式設計指南

spark-1.6.1 [原文地址] Spark Streaming程式設計指南 概覽 Spark Streaming是對核心Spark API的一個擴充套件,它能夠實現對實時資料流的流式處理,並具有很好的可擴充套件性、高吞吐量和容錯性。Spark Streaming支援從多種資料來源提取資

Spark 官方文件》Spark程式設計指南

概述 總體上來說,每個Spark應用都包含一個驅動器(driver)程式,驅動器執行使用者的main函式,並在叢集上執行各種並行操作。 Spark最重要的一個抽象概念就是彈性分散式資料集(resilient distributed dataset – RDD),RDD是一個可分割槽的元素集合,其包含的元素可

Spark 官方文件》Spark SQL, DataFrames 以及 Datasets 程式設計指南

spark-1.6.0 [原文地址] Spark SQL, DataFrames 以及 Datasets 程式設計指南 概要 Spark SQL是Spark中處理結構化資料的模組。與基礎的Spark RDD API不同,Spark SQL的介面提供了更多關於資料的結構資訊和計算任務的執

Spark程式設計指南之三:RDD基本概念

RDD是什麼? RDD(Resilient Distributed Dataset),彈性分散式資料集,是Spark的核心資料結構抽象。 它是彈性的,具有容錯能力,能夠重新計算失敗結點。 它是分散式的,資料分佈在多個結點上。 它是一個數據集,可以從外部載入資料,可以是文字檔案,JSON,

Spark程式設計指南之二:向Spark運算元傳遞函式

文章目錄 向Spark運算元傳遞函式 Java的兩種方法 匿名內部類 建立類實現Function介面 Scala的兩種方法 傳遞匿名函式 定義全域性單例物件中的靜態方法

Spark程式設計指南之一:transformation和action等RDD基本操作

文章目錄 基本概念 開發環境 程式設計實戰 初始化SparkContext RDD的生成 RDD基本操作 Key-Value Pairs Transformations f

Spark程式設計指南入門之Java篇一-基本知識

1. Spark的Java開發包 Spark提供Java的開發包,當前最新版本是2.0.2版本:spark-core_2.11-2.0.2.jar,可以從下面連結下載: http://central.maven.org/maven2/org/apache/spark/spa

Spark 2.4.0程式設計指南--Spark DataSources

Spark 2.4.0程式設計指南–Spark DataSources 更多資源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 視訊 Spark 2.4.0程式設計指

Spark 2.4.0程式設計指南--Spark SQL UDF和UDAF

Spark 2.4.0程式設計指南–Spark SQL UDF和UDAF 更多資源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 視訊 Spark 2.4.0程

Spark 2.4.0程式設計指南--spark dataSet action

Spark 2.4.0程式設計指南–spark dataSet action 更多資源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 視訊 Spark 2.4.

Spark 2.4.0 程式設計指南--快速入門

Spark 2.4.0 程式設計指南–快速入門 更多資源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 視訊 Spark 2.4.0 程式設計指南–快速入門(b

免費下載獲取Odoo中文實施 應用 指南 手冊

引言 Odoo,以前叫OpenERP,是比利時Odoo S.A.公司開發的一個企業應用軟體套件,開源套件包括一個企業應用快速開發平臺,以及幾千個Odoo及第三方開發的企業應用模組。Odoo適用於各種規模的企業應用。 Odoo功能模組涵蓋了各方面的企業應用:CRM、訂單處理(銷售訂單和採購訂單)、電子商務、

Spark2.1.0文件:Spark程式設計指南-Spark Programming Guide

1 概述 從一個較高的層次來看,每一個 Spark 應用程式由兩部分組成:driver program(驅動程式)端執行的 main 函式以及在整個叢集中被執行的各種並行操作。Spark 提供的主要抽象是一個彈性分散式資料集(RDD),它是可以被並行處理且跨節點分佈的元素的

Spark程式設計指南之四:Spark分散式叢集模式的執行時系統架構

文章目錄 官方叢集模式介紹 Cluster Manager有哪些? Standalone Apache Mesos Hadoop YARN Kubernetes Standalone模

區塊鏈教程、區塊鏈指南、區塊鏈中文手冊、區塊鏈原理

區塊鏈的技術有望以一個更低的成本解決更廣泛的信任問題所以被受人關注。 維基說了那麼長一串還是沒有說清楚區塊鏈到底是什麼,又有什麼用。我在一篇36Kr的文章《未來十年,Blockchain會如何網際網路世界?》中看到了比較好的解釋。為什麼說“區塊鏈不僅只是改變了貨幣在網際

pyspark-Spark程式設計指南

參考: 1、http://spark.apache.org/docs/latest/rdd-programming-guide.html 2、https://github.com/apache/spark/tree/v2.2.0 Spark程式設計指南 連線Spark