1. 程式人生 > >Spark官方文件翻譯:Spark Programming Guide(一)

Spark官方文件翻譯:Spark Programming Guide(一)

Overview

At a high level, every Spark application consists of a driver program that runs the user’s main function and executes various parallel operations on a cluster. The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to persist an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.

在高版本中,每一個Spark應用程式都包含一個程式驅動器,它執行使用者的主函式並在叢集上執行各種平行計算,Spark提供了一個最重要的抽象概念:彈性分散式資料集(RDD),它是在叢集的節點上分割槽的集合,可以執行平行計算。RDDs可以通過Hadoop的檔案系統(或任何Hadopp支援的檔案系統)或者在驅動程式中使用已經存在的Scala集合進行建立,使用者也可以使用spark將RDD持久化到記憶體中,使得並行操作中能夠高效的複用資料集。最後,RDDs還提供了能從故障的節點中自動重試的機制

A second abstraction in Spark is shared variables that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.

Spark提供的第二個抽象概念是共享變數可用於平行計算。預設情況下,當Spark在不同的節點上並行執行一組任務時,它會將每個變數的一個副本裝載到每個任務中。有時,變數需要在任務之間、任務和驅動程式之間共享。Spark支援兩種型別的共享變數:廣播變數,它可以用來快取所有節點上的記憶體值。以及累加器,把所有的變數累加到一起。

This guide shows each of these features in each of Spark’s supported languages. It is easiest to follow along with if you launch Spark’s interactive shell – either bin/spark-shell for the Scala shell or bin/pyspark for the Python one.

這個指南展示了Spark支援的每個語言中的每個特性。如果您啟動Spark的互動式shell,那麼它是最容易遵循的,即Scala shell的bin / Spark - shell,或者Python one的bin / pyspark。

Linking with Spark

Spark 2.2.0 is built and distributed to work with Scala 2.11 by default. (Spark can be built to work with other versions of Scala, too.) To write applications in Scala, you will need to use a compatible Scala version (e.g. 2.11.X).

spark2.2.0需要scala2.11.x以上版本

To write a Spark application, you need to add a Maven dependency on Spark. Spark is available through Maven Central at:
編寫一個spark應用,你需要新增如下maven依賴

groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0

In addition, if you wish to access an HDFS cluster, you need to add a dependency on hadoop-client for your version of HDFS.

此外,如果你想訪問HDFS叢集,你需要新增如下HDFS依賴

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

Finally, you need to import some Spark classes into your program. Add the following lines:
最後,你需要匯入一些spark類到你的專案中,如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(Before Spark 1.3.0, you need to explicitly import org.apache.spark.SparkContext._ to enable essential implicit conversions.)

在spark1.3.0之前,你需要匯入import org.apache.spark.SparkContext._啟用基本隱式轉換。

Initializing Spark

The first thing a Spark program must do is to create a SparkContext object, which tells Spark how to access a cluster. To create a SparkContext you first need to build a SparkConf object that contains information about your application.

一個Spark應用程式第一件事必須建立一個SparkContext物件,通過這個物件去連線Spark叢集,建立SparkContext物件需要先構建一個SparkConf物件,這個物件包含了應用程式的配置資訊。

Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.

JVM中只能有一個SparkContext物件處於活躍狀態,如果你想新建立一個SparkContext物件必須先關閉上一個SparkContxt

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.

appname引數是在UI介面中顯示的應用名稱,master是應用執行的模式可指定Mesos、YARN或者local。當你執行一個叢集的時候,你不會想要把master硬編碼到程式中,而是在啟動應用程式時通過spark-submit命令指定。但是,對於本地測試和單元測試,你可以通過“local”模式來執行spark

Using the Shell

In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called sc. Making your own SparkContext will not work. You can set which master the context connects to using the –master argument, and you can add JARs to the classpath by passing a comma-separated list to the –jars argument. You can also add dependencies (e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates to the –packages argument. Any additional repositories where dependencies might exist (e.g. Sonatype) can be passed to the –repositories argument. For example, to run bin/spark-shell on exactly four cores, use:

在spark shell中,已經為建立好了一個變數名為sc的SparkContext物件。你不能在自己建立一個SparkContext物件,你可以使用–master 來指定執行模式,還可以使用–jars 來指定新增需要的jar包,如果有多個可以使用“,”號分割,可以用過–packages 來指定你需要的Maven依賴,多個依賴同樣使用“,”號分割。使用4個cores來啟動spark-shell的示例程式碼如下:

$ ./bin/spark-shell --master local[4]

Or, to also add code.jar to its classpath, use:

如果需要引入code.jar

$ ./bin/spark-shell --master local[4] --jars code.jar

To include a dependency using Maven coordinates:

新增一個maven依賴

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

For a complete list of options, run spark-shell –help. Behind the scenes, spark-shell invokes the more general spark-submit script.

通過spark-shell –help命令可以獲得所有選項配置

Resilient Distributed Datasets (RDDs)

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

彈性分散式資料集(RDD)是Spark的核心概念,RDD是一個容錯的集合並且可以執行平行計算。我們可以使用兩種方式建立RDDs:在驅動器程式中呼叫parallelize方法作用於一個已經存在的資料集。,或者讀取一個外部資料集,比如共享檔案系統,HDFS,HBase或者任何Hadoop支援的資料集。

Parallelized Collections

Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program (a Scala Seq). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:

在驅動程式中通過呼叫SparkContext的parallelize方法傳入一個已經存在的資料集可以建立一個並行化的集合。集合的元素被複製成一個可以並行操作的分散式資料集。比如,這裡使用1到5的數字建立了一個並行化的資料集:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

Once created, the distributed dataset (distData) can be operated on in parallel. For example, we might call distData.reduce((a, b) => a + b) to add up the elements of the array. We describe operations on distributed datasets later on.

一旦建立,分散式資料集(distData)就可以進行平行計算,我們可以呼叫distData.reduce((a, b) => a + b)方法將陣列中的元素進行累加,我們稍後將介紹分散式資料集上的更多操作。

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

並行集合的一個重要引數是設定資料集的分割槽數。Spark將為叢集的每個分割槽執行一個任務。通常你想為叢集中的每個CPU分配2-4個分割槽。正常情況下,Spark將根據您的叢集自動設定分割槽的數量。然而,你也可以通過設定parallelize的第二個引數來設定分割槽的個數,例:sc.parallelize(data, 10)。

External Datasets

Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Spark可以使用hadoop支援任何資料來源建立分散式資料集,包括你本地的檔案系統,HDFS, Cassandra, HBase, Amazon S3, etc。Spark支援文字檔案SequenceFiles,和任何Hadoop支援的InputFormat

Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. Here is an example invocation:

使用SparkContext的textFile方法讀入一個text檔案建立RDDs,這個方法需要傳入一個檔案的URI(本地機器路徑,hdfs://, s3n://, etc URI)並且按行讀取。這裡有一個呼叫的例子:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

Once created, distFile can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the map and reduce operations as follows: distFile.map(s => s.length).reduce((a, b) => a + b).

一旦distFile被建立就可以使用該資料集上的所有操作,我們可以使用map和reduce操作計算出所有行的總長度,例:distFile.map(s => s.length).reduce((a, b) => a + b).

Some notes on reading files with Spark:
一些spark讀取檔案需要注意的事項:

  1. If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

  2. 如果使用本地檔案系統,則需要保證所有的工作節點也有相同的路徑,要麼將檔案拷貝到所有的工作節點或者使用共享檔案系統

  3. All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile(“/my/directory”), textFile(“/my/directory/.txt”), and textFile(“/my/directory/.gz”).

  4. Spark的所有基於檔案的輸入方法,包括textFile,支援在載入目錄、壓縮檔案和萬用字元,比如,你可以使用textFile(“/my/directory”)載入一個目錄,使用textFile(“/my/directory/*.txt”)帶萬用字元的方式載入該目錄下的所有txt檔案,或者像這樣
    textFile(“/my/directory/*.gz”)載入目錄下所有的gz壓縮檔案

  5. The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

  6. textFile方法同樣可以設定第二個引數來控制輸入檔案的分割槽個數。預設情況下,Spark將對每一個數據塊檔案(HDFS上預設每個資料塊是128M)建立一個分割槽,但是你也可以為每個資料塊設定多個分割槽,注意你不能設定比資料塊更小的分割槽數

Apart from text files, Spark’s Scala API also supports several other data formats:

除了文字檔案之外,Spark的Scala API還支援其他幾種資料格式:

  1. SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file. Partitioning is determined by data locality which, in some cases, may result in too few partitions. For those cases, wholeTextFiles provides an optional second argument for controlling the minimal number of partitions.

SparkContext.wholeTextFiles方法可以讀取包含多個小文字檔案的目錄,並且返回(檔名,內容)的鍵值對資料。這和textFile方法有點區別,textFile將在每個檔案中返回一條記錄。分割槽是由資料位置決定的,在某些情況下,可能導致分割槽太少。對於這些情況,wholeTextFiles 提供了一個可選的第二個引數來控制最小的分割槽數。

  1. For SequenceFiles, use SparkContext’s sequenceFile[K, V] method where K and V are the types of key and values in the file. These should be subclasses of Hadoop’s Writable interface, like IntWritable and Text. In addition, Spark allows you to specify native types for a few common Writables; for example, sequenceFile[Int, String] will automatically read IntWritables and Texts.

對於SequenceFiles,使用SparkContext的sequenceFile[K,V]方法,其中K和V是檔案中鍵和值的型別。這些應該是Hadoop Writable的子類,如IntWritable和Text。此外,Spark還允許您為一些常見的可寫程式指定本機型別;例如,sequenceFile[Int,String]將自動讀取IntWritables和文字。

  1. For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use SparkContext.newAPIHadoopRDD for InputFormats based on the “new” MapReduce API (org.apache.hadoop.mapreduce).

對於其他Hadoop inputformat,您可以使用SparkContext。hadoop rdd方法,它採用任意的JobConf和輸入格式類、鍵類和值類。設定這些方法與使用輸入源的Hadoop作業相同。您還可以使用SparkContext。newAPIHadoopRDD InputFormats基於“新”MapReduce的API(org.apache.hadoop.mapreduce)。

  1. RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.

RDD.saveAsObjectFile和SparkContext.objectFile支援以一種簡單的格式(由序列化的Java物件組成)來儲存RDD。雖然這並不像Avro那樣的特殊格式,但它提供了一種簡單的方法來儲存任何RDD。

RDD Operations

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

RDDs支援兩種型別的操作:轉換(transformations),從已有的資料集中通過轉換操作建立一個新的RDD。行為(actions),在資料集上執行計算並返回結果到驅動程式。比如,map是一個轉換操作將對資料集中的每個元素執行某個函式裡面的邏輯並返回一個新帶的資料集。另一方面,reduce是一種行為,它使用某個函式聚合所有RDD元素,並將最終結果返回給驅動程式

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

所有的轉換操作都是惰性載入的,它們並不會立即進行計算操作。事實上,轉換操作僅僅記錄下應用操作了一些基礎資料集資訊。當action操作執行時transformations才會進行真正的計算。

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

預設情況下,每個轉換的RDD可以在每次執行時重新計算。然而,您也可以使用持久化(或快取)方法在記憶體中持久化一個RDD,在這種情況下,Spark將在您下次查詢時使叢集的元素更快地訪問。還可以支援在磁碟上持久化RDDs,或者在多個節點上進行復制。