Spark官方文件翻譯:Spark Programming Guide(一)


At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.


A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.


This guide shows each of these features in each of Spark’s supported languages. It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.

這個指南展示了Spark支援的每個語言中的每個特性。如果您啟動Spark的互動式shell,那麼它是最容易遵循的,即Scala shell的bin / Spark - shell,或者Python one的bin / pyspark。

Linking with Spark

Spark 2.2.0 is built and distributed to work with Scala 2.11 by default. (Spark can be built to work with other versions of Scala, too.) To write applications in Scala, you will need to use a compatible Scala version (e.g. 2.11.X).


To write a Spark application, you need to add a Maven dependency on Spark. Spark is available through Maven Central at:

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0

In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS.


groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

Finally, you need to import some Spark classes into your program. Add the following lines:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(Before Spark 1.3.0, you need to explicitly import org.apache.spark.SparkContext._ to enable essential implicit conversions.)

在spark1.3.0之前,你需要匯入import org.apache.spark.SparkContext._啟用基本隱式轉換。

Initializing Spark

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.


Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.


val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.


Using the Shell

In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called sc. Making your own SparkContext will not work. You can set which master the context connects to using the –master argument, and you can add JARs to the classpath by passing a comma-separated list to the –jars argument. You can also add dependencies (e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates to the –packages argument. Any additional repositories where dependencies might exist (e.g. Sonatype) can be passed to the –repositories argument. For example, to run bin/spark-shell on exactly four cores, use:

在spark shell中,已經為建立好了一個變數名為sc的SparkContext物件。你不能在自己建立一個SparkContext物件,你可以使用–master 來指定執行模式,還可以使用–jars 來指定新增需要的jar包,如果有多個可以使用“,”號分割,可以用過–packages 來指定你需要的Maven依賴,多個依賴同樣使用“,”號分割。使用4個cores來啟動spark-shell的示例程式碼如下:

$ ./bin/spark-shell --master local[4]

Or, to also add code.jar to its classpath, use:


$ ./bin/spark-shell --master local[4] --jars code.jar

To include a dependency using Maven coordinates:


$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

For a complete list of options, run spark-shell –help. Behind the scenes, spark-shell invokes the more general spark-submit script.

通過spark-shell –help命令可以獲得所有選項配置

Resilient Distributed Datasets (RDDs)

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.


Parallelized Collections

Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program (a Scala Seq). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:


val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

Once created, the distributed dataset (distData) can be operated on in parallel. For example, we might call distData.reduce((a, b) => a + b) to add up the elements of the array. We describe operations on distributed datasets later on.

一旦建立,分散式資料集(distData)就可以進行平行計算,我們可以呼叫distData.reduce((a, b) => a + b)方法將陣列中的元素進行累加,我們稍後將介紹分散式資料集上的更多操作。

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

並行集合的一個重要引數是設定資料集的分割槽數。Spark將為叢集的每個分割槽執行一個任務。通常你想為叢集中的每個CPU分配2-4個分割槽。正常情況下,Spark將根據您的叢集自動設定分割槽的數量。然而,你也可以通過設定parallelize的第二個引數來設定分割槽的個數,例:sc.parallelize(data, 10)。

External Datasets

Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Spark可以使用hadoop支援任何資料來源建立分散式資料集,包括你本地的檔案系統,HDFS, Cassandra, HBase, Amazon S3, etc。Spark支援文字檔案SequenceFiles,和任何Hadoop支援的InputFormat

Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. Here is an example invocation:

使用SparkContext的textFile方法讀入一個text檔案建立RDDs,這個方法需要傳入一個檔案的URI(本地機器路徑,hdfs://, s3n://, etc URI)並且按行讀取。這裡有一個呼叫的例子:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

Once created, distFile can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map and reduce operations as follows: distFile.map(s => s.length).reduce((a, b) => a + b).

一旦distFile被建立就可以使用該資料集上的所有操作,我們可以使用map和reduce操作計算出所有行的總長度,例:distFile.map(s => s.length).reduce((a, b) => a + b).

Some notes on reading files with Spark:

  1. If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

  2. 如果使用本地檔案系統,則需要保證所有的工作節點也有相同的路徑,要麼將檔案拷貝到所有的工作節點或者使用共享檔案系統

  3. All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile(“/my/directory”), textFile(“/my/directory/.txt”), and textFile(“/my/directory/.gz”).

  4. Spark的所有基於檔案的輸入方法,包括textFile,支援在載入目錄、壓縮檔案和萬用字元,比如,你可以使用textFile(“/my/directory”)載入一個目錄,使用textFile(“/my/directory/*.txt”)帶萬用字元的方式載入該目錄下的所有txt檔案,或者像這樣

  5. The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

  6. textFile方法同樣可以設定第二個引數來控制輸入檔案的分割槽個數。預設情況下,Spark將對每一個數據塊檔案(HDFS上預設每個資料塊是128M)建立一個分割槽,但是你也可以為每個資料塊設定多個分割槽,注意你不能設定比資料塊更小的分割槽數

Apart from text files, Spark’s Scala API also supports several other data formats:

除了文字檔案之外,Spark的Scala API還支援其他幾種資料格式:

  1. SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file. Partitioning is determined by data locality which, in some cases, may result in too few partitions. For those cases, wholeTextFiles provides an optional second argument for controlling the minimal number of partitions.

SparkContext.wholeTextFiles方法可以讀取包含多個小文字檔案的目錄,並且返回(檔名,內容)的鍵值對資料。這和textFile方法有點區別,textFile將在每個檔案中返回一條記錄。分割槽是由資料位置決定的,在某些情況下,可能導致分割槽太少。對於這些情況,wholeTextFiles 提供了一個可選的第二個引數來控制最小的分割槽數。

  1. For SequenceFiles, use SparkContext’s sequenceFile[K, V] method where K and V are the types of key and values in the file. These should be subclasses of Hadoop’s Writable interface, like IntWritable and Text. In addition, Spark allows you to specify native types for a few common Writables; for example, sequenceFile[Int, String] will automatically read IntWritables and Texts.

對於SequenceFiles,使用SparkContext的sequenceFile[K,V]方法,其中K和V是檔案中鍵和值的型別。這些應該是Hadoop Writable的子類,如IntWritable和Text。此外,Spark還允許您為一些常見的可寫程式指定本機型別;例如,sequenceFile[Int,String]將自動讀取IntWritables和文字。

  1. For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use SparkContext.newAPIHadoopRDD for InputFormats based on the “new” MapReduce API (org.apache.hadoop.mapreduce).

對於其他Hadoop inputformat,您可以使用SparkContext。hadoop rdd方法,它採用任意的JobConf和輸入格式類、鍵類和值類。設定這些方法與使用輸入源的Hadoop作業相同。您還可以使用SparkContext。newAPIHadoopRDD InputFormats基於“新”MapReduce的API(org.apache.hadoop.mapreduce)。

  1. RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.


RDD Operations

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).


All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.


By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.
