1. 程式人生 > >官網翻譯之RDD Programming Guide-Scala

官網翻譯之RDD Programming Guide-Scala

Overview(概述)

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

從高一點的層次上看,每個Spark應用程式都由一個驅動程式組成,該驅動程式執行使用者的主要功能,並在叢集上執行各種並行操作。Spark提供的主要抽象概念是彈性分散式資料集(RDD),它是被切分後分布在叢集中可以被平行操作的各個節點之間的資料元素的集合。RDD是通過從Hadoop檔案系統中的檔案(或任何其他Hadoop支援的檔案系統)或驅動程式中的現有Scala集合開始並轉換來建立的。使用者還可以要求Spark在記憶體中儲存一個RDD,允許它在並行操作中被有效地重用。最後,RDDS可以從節點故障中自動恢復。

A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

Spark中的第二個抽象概念是可用於並行操作的共享變數。預設情況下,當Spark在不同的節點上並行執行函式作為一組任務時,它將函式中使用的每個變數的副本傳送到每個任務。有時候,一個變數需要跨任務共享,或者在任務和驅動程式之間共享。Spark支援兩種型別的共享變數:廣播變數和累加器,廣播變數可以在所有節點上的記憶體中快取一個值,累加器是僅“新增”的變數,如計數器和求和。

This guide shows each of these features in each of Spark’s supported languages. It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.

本指南展示Spark支援的語言中的每一個特性。如果啟動Spark的互動式shell——Scala shell的bin/spark-shell或Python的bin/pyspark,則很容易跟隨。

Linking with Spark(和Spark的連結)

Spark 2.4.0 is built and distributed to work with Scala 2.11 by default. (Spark can be built to work with other versions of Scala, too.) To write applications in Scala, you will need to use a compatible Scala version (e.g. 2.11.X).

預設情況下Spark2.4.0可以和Scala 2.11一起構建並工作。(Spark也可以和Scala的其他版本一起構建並工作。)在Scala中編寫應用程式,您將需要使用一個相容的Scala版本(例如2.11。x)。

To write a Spark application, you need to add a Maven dependency on Spark. Spark is available through Maven Central at:

要編寫Spark應用程式,需要在Spark上新增Maven依賴項。Spark可通過Maven Central提供:

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.4.0

In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS.

此外,如果希望訪問HDFS叢集,則需要為HDFS版本新增Hadoop客戶端的依賴項。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

Finally, you need to import some Spark classes into your program. Add the following lines:

最後,您需要匯入一些Spark類到程式中。新增以下幾行:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(Before Spark 1.3.0, you need to explicitly import org.apache.spark.SparkContext._ to enable essential implicit conversions.)

(在SCAP1.3.0之前,您需要顯式導org.apache.spark.SparkContext._以啟用必要的隱式轉換。)

Initializing Spark(初始化Spark)

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.

Spark程式必須做的第一件事是建立一個SparkContext物件,通過它可以訪問一個叢集。要建立SparkContext,首先需要構建包含有關應用程式的資訊的SparkConf物件。

Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.

在一個JVM中只有一sparkcontext 可能是啟用狀態。在你主動創造一個新的SparkContext之前,你必須呼叫stop()方法停止sparkcontext 。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.

appName引數是應用程式在群集UI上顯示的名稱。master是Spark、Mesos或YARN叢集的URL,或是在本地模式下執行的特殊“local”字串。實際上,當在叢集上執行時,您不想在程式中硬編碼master,而是使用spark-submit啟動應用程式並在那裡接收它。但是,對於本地測試和單元測試,您可以將master設定為“local”來執行Spark程序。

Using the Shell(使用shell工具)

in the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called sc. Making your own SparkContext will not work. You can set which master the context connects to using the --master argument, and you can add JARs to the classpath by passing a comma-separated list to the --jars argument. You can also add dependencies (e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates to the --packages argument. Any additional repositories where dependencies might exist (e.g. Sonatype) can be passed to the --repositories argument. For example, to run bin/spark-shell on exactly four cores, use:

在SsparkShell中,一個特殊的SparkContext已經為你建立好了,這個變數名字是sc。自己建立的SparkContext將不會生效。你可以通過–master argument設定上下文連線到哪個主機,並通過向–jars引數傳遞逗號分隔的列表,將JAR新增到類路徑。您還可以通過向–packages引數提供逗號分隔的Maven座標列表來向shell會話新增依賴項(例如,Spark Packages)。任何可能存在依賴關係的附加庫(例如Sonatype)都可以傳遞給-repositories引數。例如,在四個核心上執行bin/spark-shell外殼,使用:

$ ./bin/spark-shell --master local[4]

Or, to also add code.jar to its classpath, use:(或者新增code.jar 到路徑上)

$ ./bin/spark-shell --master local[4] --jars code.jar

To include a dependency using Maven coordinates:(新增maven依賴)

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

For a complete list of options, run spark-shell --help. Behind the scenes, spark-shell invokes the more general spark-submit script.

有關選項的完整列表,請執行Shell Shell --help。在後臺,Spark-Shell 執行了更常規的spark-submit指令碼。

Resilient Distributed Datasets (RDDs)

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

Spark圍繞彈性分散式資料集(RDD)的概念展開,RDD是可以並行操作的元素的容錯集合。建立RDD有兩種方法:並行化驅動程式中的現有集合,或者引用外部儲存系統中資料集,比如共享檔案系統、HDFS、HBase或提供支援Hadoop InputFormat的任何資料來源。

Parallelized Collections(並行集合)

Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program (a Scala Seq). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:

並行集合是通過在驅動程式(Scala Seq)中的現有集合上呼叫SparkContext的parallelize方法建立的。集合中的元素被複制以形成可並行操作的分散式資料集。例如,這裡是如何建立一個並行的集合,保持數字1到5:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

Once created, the distributed dataset (distData) can be operated on in parallel. For example, we might call distData.reduce((a, b) => a + b) to add up the elements of the array. We describe operations on distributed datasets later on.

一旦建立,分散式資料集(distData)可以被並行操作。例如,我們可以呼叫distData.reduce((a, b) => a + b)來實現將陣列的元素相加。我們稍後描述分散式資料集上的操作。

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

並行集合的一個重要引數是將資料集切割成分割槽的數量。Spark將為叢集中的每個分割槽執行一個任務。通常,對於叢集中的每個CPU,都需要2-4個分割槽。通常,Spark會根據您的群集自動設定分割槽的數量。但是,您也可以通過設定parallelize的第二個引數手動設定(例如,sc.parallelize(data, 10))。注意:程式碼中的某些地方使用術語“切片”(分割槽的同義詞)來保持向後相容性。

External Datasets(外部資料集)

Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

Spark可以從Hadoop支援的任何儲存源建立分散式資料集,包括本地檔案系統、HDFS、Cassandra、HBase、Amazon S3等。Spark支援文字檔案、SequenceFiles和任何其他Hadoop輸入格式。

Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines. Here is an example invocation:

可以使用SparkContext的textFile方法建立文字檔案RDDS。此方法獲取檔案的URI(機器上的本地路徑或hdfs://, s3a://, etc URI),並將其作為行的集合讀取。下面是一個示例呼叫:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

Once created, distFile can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map and reduce operations as follows: distFile.map(s => s.length).reduce((a, b) => a + b).

一旦建立,distFile可以通過dataset操作來操作。例如,我們可以使用map和reduce操作將所有行的大小相加,如下所示:distFile.map(s => s.length).reduce((a, b) => a + b)。

Some notes on reading files with Spark:(閱讀Spark檔案的幾點提示)

If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

如果使用本地檔案系統上的路徑,檔案也必須在工作節點上可以被訪問。要麼將檔案複製到所有工作節點,要麼使用網路共享檔案系統。

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

Spark的所有基於檔案的輸入方法,包括textFile、支援在目錄、壓縮檔案和萬用字元上執行。例如,您可以使用textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz")。

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

textFile方法還採用一個可選的第二個引數來控制檔案分割槽的數量。預設情況下,Spark為檔案的每個塊建立一個分割槽(HDFS中預設為128MB的塊),但是您也可以通過傳遞更大的值來請求更多數量的分割槽。請注意,分割槽數不能比塊少。

Apart from text files, Spark’s Scala API also supports several other data formats:

除了文字檔案之外,Scala的Scala API還支援其他幾種資料格式:

SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file. Partitioning is determined by data locality which, in some cases, may result in too few partitions. For those cases, wholeTextFiles provides an optional second argument for controlling the minimal number of partitions.

SparkContext.wholeTextFiles允許您讀取包含多個小文字檔案的目錄,並將每個小文字檔案作為(檔名、內容)對返回。這與textFile相反,它將把檔案中每行作為一條記錄返回。分割槽是由資料的位置決定的,在某些情況下,可能導致太少的分割槽。對於這些情況,wholeTextFiles提供了一個可選的第二個引數,用於控制最小數量的分割槽。

For SequenceFiles, use SparkContext’s sequenceFile[K, V] method where K and V are the types of key and values in the file. These should be subclasses of Hadoop’s Writable interface, like IntWritable and Text. In addition, Spark allows you to specify native types for a few common Writables; for example, sequenceFile[Int, String] will automatically read IntWritables and Texts.

對於SequenceFiles,使用SparkContext的sequenceFile[k,v]方法,其中k和v是檔案中的鍵值和值型別。這些應該是Hadoop的Writable介面的子類,像是IntWritable和Text的。此外,Spark允許您為幾個常見的Writable指定本機型別;例如,sequenceFile[Int,String]將自動讀取IntWritables和Texts。

For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use SparkContext.newAPIHadoopRDD for InputFormats based on the “new” MapReduce API (org.apache.hadoop.mapreduce).

對於其他Hadoop InputFormats,可以使用SparkContext.hadoopRDD方法,該方法接受任意JobConf和輸入格式類、鍵類和值類。像為Hadoop作業設定輸入源相同的方式設定他們。您還可以使用SparkContext.newAPIHadoopRDD處理和新的 MapReduce(org.apache.hadoop.mapreduce) API相同格式的資料

RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.

RDD.saveAsObjectFile和SparkContext.objectFile支援像構成Java物件的序列化一樣的方式儲存RDD。雖然這不如像AVRO這樣的專門格式高效,但它提供了一種簡單的方法來儲存任何RDD。

RDD Operations(RDD的操作)

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

RDD支援兩種型別的操作:轉換和操作,轉換從現有資料集建立新資料集,操作在資料集上執行計算後向驅動程式返回值。例如,map是一個轉換,它通過函式傳遞每個資料集元素並返回表示結果的新RDD。另一方面,reduce使用一些函式聚合RDD的所有元素並將最終結果返回給驅動程式(儘管也有一個並行的reduceByKey返回分散式資料集)。

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

Spark中的所有轉換都是懶惰的,因為它們並不立即計算它們的結果。相反,他們只記得在基本資料集上應用一些(例如檔案)轉換。只有當一個action需要返回驅動程式的結果時才計算這些轉換。這種設計使Spark能夠更有效地執行。例如,我們可以看到,通過map建立的資料集將在reduce過後,只將reduce的結果返回給驅動程式,而不是更大的對映資料集。

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

預設情況下,每次轉換RDD都可能在每次執行RDD時重新計算。但是,您也可以使用persist(或cache)方法在記憶體中持久化RDD,在這種情況下,Spark將保留叢集上的元素,以便在下次查詢時更快地訪問它們。還支援在磁碟上持久化RDDS,或者在多個節點上覆制。

Basics

To illustrate RDD basics, consider the simple program below:(想要列出RDD的基礎操作,可以考慮下面的簡單程式)

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.

第一行定義了一個基於外部檔案的RDD。此資料集未載入到記憶體中或以其他方式進行操作:lines僅是指向檔案的指標。第二行定義了一個變數lineLengths作為map轉換的結果。同樣,由於懶惰,lineLengths沒有立即計算出來。最後,我們執行reduce,這是一個action。此時,Spark將計算分解為要在單獨的機器上執行的任務,並且每臺機器都執行其map部分並且本地執行reduce,只向驅動程式返回其結果。

If we also wanted to use lineLengths again later, we could add: (如果我們想要稍後再次使用lineLengths,我們可新增:)

lineLengths.persist()

before the reduce, which would cause lineLengths to be saved in memory after the first time it is computed.

在reduce之前(新增),這樣在lineLengths第一次計算後,就會將結果存入記憶體中。

Passing Functions to Spark(向Spark傳遞函式)

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:

Spark的API嚴重依賴於在叢集上執行的驅動程式中的傳遞函式。有兩種推薦方法可以做到這一點:

Anonymous function syntax, which can be used for short pieces of code.

匿名函式語法,可用於短程式碼。

Static methods in a global singleton object. For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows:

全域性單體物件中的靜態方法。例如,您可以定義物件MyFunctions,然後像下面這樣傳遞MyFunctions.func1.:

object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:

注意,雖然也可以在類方法(與單例物件相反)中傳遞方法的引用,但是這需要傳遞包含該類的物件以及方法。例如,考慮:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

Here, if we create a new MyClass instance and call doStuff on it, the map inside there references the func1 method of that MyClass instance, so the whole object needs to be sent to the cluster. It is similar to writing rdd.map(x => this.func1(x)).

這裡,如果我們建立了一個新的MyClass例項並在其上呼叫doStuff方法,那麼其中的map引用了MyClass例項的func1方法,因此需要將整個物件傳送到叢集。它類似於編寫rdd.map(x => this.func1(x))。

In a similar way, accessing fields of the outer object will reference the whole object:

以類似的方式,訪問外部物件的欄位將引用整個物件:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

is equivalent to writing rdd.map(x => this.field + x), which references all of this. To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally:

相當於編寫rdd.map(x => this.field + x),引用了整個物件。為了避免這個問題,最簡單的方法是將欄位複製到區域性變數中,而不是從外部訪問它。

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}
Understanding closures(理解閉包)

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses foreach() to increment a counter, but similar issues can occur for other operations as well.

關於Spark比較困難的一個地方是理解在一個叢集中執行程式碼時變數和方法的範圍和生命週期。在其範圍之外修改變數的RDD操作可能是混淆的常見來源。在下面的示例中,我們將看到使用foreach()來實現遞增計數器,但是同樣的問題其他操作也可能出現。

Example

Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in local mode (--master = local[n]) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):

考慮下面的本地RDD元素求和問題,根據是否在同一JVM內執行不同,其行為可能有所不同。一個常見的例子是在本地模式下執行Spark(–master=local[n])與將Spark應用程式部署到叢集(例如,通過spark-submit到YARN)時:

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
Local vs. cluster modes(本地模式vs叢集模式)

The behavior of the above code is undefined, and may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.

上述程式碼的行為是未定義的,可能無法按預期工作。為了執行作業,Spasrk將RDD操作的處理分解為任務,每個任務由執行器執行。在執行之前,Spark計算任務的閉包。閉包是那些可以被executor用來執行RDD上的計算的變數和方法 (這個例子中是 foreach()). 此閉包被序列化併發送給每個執行器。

The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.

傳送到每個執行器的閉包中的變數現在是副本,因此,當在foreach函式中引用計數器時,它不再是驅動程式節點上的計數器。在驅動節點的記憶體中還有一個計數器,但是對執行器來說不再可見了!執行器只能從序列化的閉包中看到副本。因此,計數器的最終值仍然為零,因為計數器上的所有操作都引用序列化閉包內的值。

In local mode, in some circumstances, the foreach function will actually execute within the same JVM as the driver and will reference the same original counter, and may actually update it.

在本地模式下,在某些情況下,foreach函式將在與驅動程式相同的JVM中執行,並且將引用相同的原始計數器,也許可以更新它(counter)。

To ensure well-defined behavior in these sorts of scenarios one should use an Accumulator. Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.

為了確保在這些場景中定義良好的行為,人們應該使用累加器。Spark中的累加器專門用於提供一種機制,用於在叢集中的工作節點之間拆分執行時安全地更新變數。本指南的累加器部分更詳細地討論了這些問題。

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.

通常,閉包,如迴圈或本地定義的方法,不應該用來改變一些全域性狀態。Spark不定義或保證從閉包外部引用的物件的突變行為。一些程式碼可以在本地模式下工作,但這只是偶然的,而且這種程式碼在分散式模式下不會像預期的那樣工作。如果需要一些全域性聚合,則使用累加器。

Printing elements of an RDD(列印RDD的元素)

Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).

另一個常見的習慣用法是嘗試使用 rdd.foreach(println)或rdd.map(println)列印RDD的元素。在單個機器上,這將生成預期輸出並列印所有RDD元素。但是,在叢集模式下,執行器呼叫到stdout的輸出現在寫入執行器的stdout,而不是寫入驅動程式上的stdout,因此驅動程式上的stdout不會顯示這些輸出!要列印驅動程式上的所有元素,可以使用collect()方法首先將RDD帶到驅動程式節點,因此:rdd.collect().foreach(println)。但是,這可能導致驅動程式的記憶體耗盡,因為collect()將整個叢集中RDD獲取到一臺機器;如果只需要列印RDD的幾個元素,更安全的方法是使用take():rdd.take(100 foreach).(println)。

Working with Key-Value Pairs(使用key-value 對兒)

While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements by a key.

雖然大多數Spark操作可以在包含任何型別的物件的RDD上工作,但是隻有少數特殊操作在鍵-值對的RDD上可用。最常見的是分散式的“洗牌(shuffle)”操作,例如通過key分組或聚合元素。

In Scala, these operations are automatically available on RDDs containing Tuple2 objects (the built-in tuples in the language, created by simply writing (a, b)). The key-value pair operations are available in the PairRDDFunctions class, which automatically wraps around an RDD of tuples.

在Scala中,這些操作在包含Tuple2物件的RDD上自動可用(Scala語言中的內建元組,通過簡單地編寫(a,b)建立)。鍵值對操作可在PairRDDFunctions類中使用,它自動對一組元組的RDD進行封裝。

For example, the following code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file:

例如,下面的程式碼對鍵-值對使用reduceByKey操作來計算檔案中每行文字出現的次數:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

We could also use counts.sortByKey(), for example, to sort the pairs alphabetically, and finally counts.collect() to bring them back to the driver program as an array of objects.

例如,我們還可以使用counts.sortByKey()按字母順序對這些對進行排序,最後使用counts.collect()將它們作為物件陣列帶回驅動程式。

Note: when using custom objects as the key in key-value pair operations, you must be sure that a custom equals() method is accompanied with a matching hashCode() method. For full details, see the contract outlined in the Object.hashCode() documentation.

注意:當在鍵-值對操作中使用自定義物件作為鍵時,必須確保自定義equals()方法與匹配的hashCode()方法一起使用。有關詳細資訊,請參見Object.hashCode()文件中的描述。

Transformations(轉化)

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.

下表列出了Spark支援的一些常見轉換。有關詳細資訊,請參考RDD API DOC(Scala、Java、Python、R)和配對RDD函式DOC(Scala,Java)。

Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.
sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numPartitions])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.
reduceByKey(func, [numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numPartitions]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.
Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.

下表列出了Spark支援的一些常見操作(Actions)。參考RDD API DOC(Scala,Java,Python,R)和配對RDD函式DOC(Scala,Java)以獲取詳細資訊。

Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path) (Java and Scala) Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path) (Java and Scala) Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

The Spark RDD API also exposes asynchronous versions of some actions, like foreachAsync for foreach, which immediately return a FutureAction to the caller instead of blocking on completion of the action. This can be used to manage or wait for the asynchronous execution of the action.

Spark RDD API還公開了一些異操作的步版本,比如foreachAsync for foreach,它立即向呼叫者返回FutureAction,而不是阻塞等待操作完成。這可以用來管理或等待非同步執行操作。

Shuffle operations(洗牌操作)

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

Spark的某些操作觸發稱為洗牌的事件。Sffffle是Spark的重新分配資料的機制,以便在分割槽上對其進行分組。這通常涉及executors和機器資料的複製,因此洗牌是複雜且耗效能的操作。

Background

To understand what happens during the shuffle we can consider the example of the reduceByKey operation. The reduceByKey operation generates a new RDD where all values for a single key are combined into a tuple - the key and the result of executing a reduce function against all values associated with that key. The challenge is that not all values for a single key necessarily reside on the same partition, or even the same machine, but they must be co-located to compute the result.

為了理解在洗牌過程中會發生什麼,我們可以考慮reduceByKey操作的例子。reduceByKey操作生成一個新的RDD,其中將單個key的所有值組合成一個元組——key和對與該key相關聯的所有值執行reduce函式的結果。挑戰在於,對於單個key,並非所有值都必須駐留在相同的分割槽上,甚至駐留在相同的機器上,但是它們必須位於同一位置才能計算結果。

In Spark, data is generally not distributed across partitions to be in the necessary place for a specific operation. During computations, a single task will operate on a single partition - thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.

在Spark中,資料通常不分佈在分割槽上,以便在特定的操作中處於必要的位置。在計算期間,單個任務將在單個分割槽上操作——因此,為了組織要執行的單個reduceByKey reduce任務的所有資料,Spark需要執行多對多的操作。它必須從所有分割槽讀取以找到所有鍵的所有值,然後跨分割槽彙集值以計算每個鍵的最終結果——這稱為洗牌。

Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably ordered data following shuffle then it’s possible to use:

雖然新改組資料的每個分割槽中的元素集是確定性的,分割槽本身的排序也是確定性的,但是這些元素的排序不是。如果一個人希望在shuffle之後有可預測的有序資料,那麼就有可能使用:

mapPartitions to sort each partition using, for example, .sorted(對每個分割槽進行排序.例如,使用sorted。)

repartitionAndSortWithinPartitions to efficiently sort partitions while simultaneously repartitioning(有效地分割槽同時重新分配 )

sortBy to make a globally ordered RDD(生成全域性有序RDD)

Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

可能導致shuffle的操作包括重新分割槽操作如repartition和coalesce、‘ByKey operations(除了計數)如groupByKey和reduceByKey,以及連線操作(如cogroup和join)。

Performance Impact(效能影響)

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.

Shuffle是一項昂貴的操作,因為它涉及磁碟I/O、資料序列化和網路I/O。為了組織用於shffle的資料,Spark生成任務集-map任務集來組織資料,以及一組reduce任務集來聚集資料。這個術語來自MapReduce,並不直接涉及Spark的map和reduce操作。

Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.

在內部,單個map任務的結果儲存在記憶體中,直到它們不能匹配(找不到匹配的資料,即轉換結束)為止。然後,基於目標分割槽對這些檔案進行排序,並將其寫入單個檔案。reduce任務讀取相關的排完序的資料塊。

Certain shuffle operations can consume significant amounts of heap memory since they employ in-memory data structures to organize records before or after transferring them. Specifically, reduceByKey and aggregateByKey create these structures on the map side, and 'ByKey operations generate these on the reduce side. When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.

某些Shuffle操作會消耗大量堆記憶體,因為它們在傳輸記錄之前或之後使用記憶體中的資料結構來組織記錄。具體而言,reduceByKey和aggregateByKey在map一端建立這些結構,而“ByKey操作在reduce一端生成這些結構”。當資料不適合記憶體時,Spark會將這些表溢位到磁碟,從而引起磁碟I/O的額外開銷和增加的垃圾收集。

Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are preserved until the corresponding RDDs are no longer used and are garbage collected. This is done so the shuffle files don’t need to be re-created if the lineage is re-computed. Garbage collection may happen only after a long period of time, if the application retains references to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may consume a large amount of disk space. The temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark context.

Shuffle還可以在磁碟上生成大量的中間檔案。對於Spark 1.3,這些檔案被儲存,直到不再使用相應的RDDS並被垃圾回收機制回收。這樣做了,如果重新計算血統,則不需要重新建立洗牌檔案。垃圾收集可能只在長時間之後發生,如果應用程式保留對這些RDD的引用,或者如果GC啟動不夠頻繁。這意味著長時間執行的Spark作業可能消耗大量的磁碟空間。臨時儲存目錄由配置Spark時spark.local.dir配置引數指定。

Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the ‘Shuffle Behavior’ section within the Spark Configuration Guide.

可以通過調整各種配置引數來調整Shuffle行為。請參Spark配置指南中的“Shuffle Behavior”部分。

RDD Persistence(RDD的持久化)

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

Spark中最重要的功能之一是在記憶體中儲存操作後的(或快取)資料集。當持久化RDD時,每個節點都將其計算的任何分割槽儲存在記憶體中,並在資料集(或從中派生的資料集)上的其他操作中重用它們。這允許未來的行動要快得多(通常超過10倍)。快取是迭代演算法和快速互動使用的關鍵工具。

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

可以使用它的persist()或cache()方法標記RDD可以被快取。第一次在一個action中計算過後,它將被儲存在節點上的記憶體中。Spark的快取是容錯的——如果RDD的任何分割槽丟失,它將使用最初建立它的轉換自動重新計算。

In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object (Scala, Java, Python) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:

此外,每個持久化RDD可以使用不同的儲存級別來儲存,例如,允許您將資料集儲存在磁碟上,將其儲存在記憶體中,但使用序列化的Java物件的形式(以節省空間),將其複製到節點上。這些級別是通過傳遞StorageLevel物件(Scala、Java、Python)persist()來設定的。cache()方法是使用預設儲存級別的簡寫,即StorageLevel.MEMORY_ONLY(在記憶體中儲存反序列化物件)。全套儲存級別為:

Storage Level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.
MEMORY_ONLY_SER (Java and Scala) Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER (Java and Scala) Similar to MEMORY_ONLY_SER, but spill partitions that don’t fit in memory to disk instead of recomputing them on the fly each time they’re needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

Note: In Python, stored objects will always be serialized with the Pickle library, so it does not matter whether you choose a serialized level. The available storage levels in Python include MEMORY_ONLY, MEMORY_ONLY_2, MEMORY_AND_DISK, MEMORY_AND_DISK_2, DISK_ONLY, and DISK_ONLY_2.

注意:在Python中,儲存的物件總是用Pickle庫進行序列化,所以是否選擇序列化級別並不重要。Python中可用的儲存級別包括MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_.DISK、MEMORY._DISK_2、DISK_ONLY和DISK_ONLY_2。

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

Spark還會自動儲存shuffle操作過程中一些中間資料(例如,reduceByKey)中,即使沒有使用者呼叫persist。這樣做是為了避免一個節點在shuffle過程中失敗時需要重新計算整個輸入。如果打算重用RDD,仍然建議使用者在所需的RDD上呼叫永續性。

Which Storage Level to Choose?(怎麼選擇儲存級別)

Spark’s storage levels are meant to provide different trade-offs between memory usage and CPU efficiency. We recommend going through the following process to select one:

Spark的儲存級別是為了在記憶體使用和CPU效率之間提供不同的權衡。我們建議通過以下過程來選擇一個:

If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

如果您的RDDS與預設儲存級別(MeimyIyOnLead)適配,那麼就把它們保留下來。這是CPU效率最高的選項,允許RDDS上的操作儘可能快地執行。

If not, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)

如果沒有,請嘗試使用MEMORY_ONLY_SER並選擇一個快速序列化庫來使物件更加節省空間,但是訪問速度仍然相當快。(Java and Scala)

Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.

不要持久化到磁碟,除非計算資料集的函式是昂貴的,或者它們會過濾大量資料。否則,重新計算分割槽可能與從磁碟讀取一樣快。

Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.

如果需要快速故障恢復(例如,如果使用Spark來服務Web應用程式的請求),則使用複製的儲存級別。所有的儲存級別都通過重新計算丟失的資料提供充分的容錯性。但複製的讓你繼續執行RDD上任務而不用等待重新計算丟失的分割槽。

Removing Data(移除資料)

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

Spark自動監視每個節點上的快取使用,並以最近最少使用的(LRU)方式丟棄舊資料分割槽。如果您想手動刪除RDD,而不是等待它從快取中掉出來,請使用RDD.unpersist()方法。

Shared Variables(共享變數)

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

通常,當在遠端叢集節點上執行傳遞給Spark操作(例如map或reduce)的函式時,它處理函式中使用的所有變數的單獨副本。這些變數被複制到每臺機器,並且遠端機器上的變數的更新不會被傳播回驅動程式。支援跨任務的通用、讀寫共享變數將是低效的。然而,Spark確實為兩種常見使用模式提供了兩種有限型別的共享變數:廣播變數和累加器。

Broadcast Variables(廣播變數)

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

廣播變數允許程式設計師在每個機器上儲存只讀變數,而不是用任務來發送它的副本。例如,它們可以以有效的方式給每個節點提供一個大的輸入資料集的副本。Spark還嘗試使用有效的廣播演算法來分發廣播變數,以降低通訊成本。

Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

Spark的actions是通過一組階段執行的,由分散式的“shuffle”操作分開。Spark自動廣播每個階段任務所需的公共資料。以這種方式廣播的資料以序列化的形式快取,然後在執行每個任務之前反序列化。這意味著,顯式地建立廣播變數只有在跨多個階段的任務需要相同的資料或者以反序列化的形式快取資料很重要時才有用。

Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code below shows this:

廣播變數是通過一個變數v呼叫SparkContext.broadcast(v)建立的。廣播變數是圍繞v的包裝器,其值可以通過呼叫value方法來訪問。下面的程式碼顯示:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).

在建立廣播變數之後,應該在叢集上執行的任何函式中使用它,而不是值v,這樣v就不會多次被傳送到節點。此外,在廣播物件v之後,不應該對其進行修改,以確保所有節點獲得廣播變數的相同值(例如,如果變數稍後被傳送到新節點)

Accumulators(累加器)

Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types.

累加器是僅通過關聯操作和交換操作被“增加”的變數,因此可以有效地並行支援。它們可以用於實現計數器(如MapReduce)或求和。Spark本身支援數字型別的累加器,程式設計師可以新增對新型別的支援。

As a user, you can create named or unnamed accumulators. As seen in the image below, a named accumulator (in this instance counter) will display in the web UI for the stage that modifies that accumulator. Spark displays the value for each accumulator modified by a task in the “Tasks” table.

作為使用者,可以建立命名的或未命名的累加器。如下圖所示,一個命名累加器(在本例中為counter)將在WebUI中顯示用於修改該累加器的階段。SparK顯示在“任務”表中由任務修改的每個累加器的值。

image

Tracking accumulators in the UI can be useful for understanding the progress of running stages (NOTE: this is not yet supported in Python).