1. 程式人生 > >Spark Programming Guide(二)

Spark Programming Guide(二)

Basics

To illustrate RDD basics, consider the simple program below:
仔細觀察下面的程式,介紹了RDD的基本功能:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.

第一行程式碼從外部檔案定義了一個基本的RDD,這裡資料集並沒有被真正的載入進記憶體,這裡僅僅是建立了一個指向檔案的一個指標。第二行定義了一個lineLengths的資料集由第一個資料集轉換操作產生。同樣由於RDD是惰性載入的,這裡lineLengths並不會立即進行計算。最後,我們執行了action進行reduce計算。到此時Spark將計算任務分配到不同的機器上,每臺機器同時對分割槽資料進行map和本地的reduce操作,最後把結果資料返回到驅動程式。

If we also wanted to use lineLengths again later, we could add:

如果我們在後面需要繼續使用lineLengths,我們可以呼叫如下方法:

lineLengths.persist()

before the reduce, which would cause lineLengths to be saved in memory after the first time it is computed.

當執行第一個reduce計算操作之後lineLengths將會被持久化到記憶體中

Passing Functions to Spark

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:

Spark的API很大程度上依賴於從驅動程式傳遞函式到叢集之上,這裡有兩種推薦的方法來做這件事:

  1. Anonymous function syntax, which can be used for short pieces of code.

使用簡短的匿名函式語法

  1. Static methods in a global singleton object. For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows:

使用全集單例物件的靜態方法。比如,你可以定義一個物件MyFunctions並且呼叫MyFunctions.func1方法,就像下面:

object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method. For example, consider:

注意,雖然還可以傳遞類例項中的方法的引用(而不是單例物件),但這需要將包含該類的物件連同方法一起傳送出去。例如,考慮:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

Here, if we create a new MyClass instance and call doStuff on it, the map inside there references the func1 method of that MyClass instance, so the whole object needs to be sent to the cluster. It is similar to writing rdd.map(x => this.func1(x)).

這裡,如果我們建立一個MyClass的例項並呼叫了它的doStuff方法,這裡的map操作中引用了MyClass例項的func1方法,所以整個MyClass物件都需要被髮送的叢集上,等同於rdd.map(x => this.func1(x)).

In a similar way, accessing fields of the outer object will reference the whole object:

以類似的方式,訪問外部物件的欄位將引用整個物件:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

is equivalent to writing rdd.map(x => this.field + x), which references all of this. To avoid this issue, the simplest way is to copy field into a local variable instead of accessing it externally:

這等同於rdd.map(x => this.field + x),引用了整個物件。最簡單的方法是將欄位複製到本地變數,而不是外部訪問它:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}
Understanding closures

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses foreach() to increment a counter, but similar issues can occur for other operations as well.

想要理解Spark的方法和變數在叢集中的生命週期是比較困難的。RDD操作在其範圍之外修改變數可能是造成混淆的常見原因。在下面的示例中,我們將檢視使用foreach()來增加計數器的程式碼,但是類似的問題也可能發生在其他操作中。

Example

Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in local mode (–master = local[n]) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):

看下面一個簡單的 RDD 元素求和,執行結果可能不同,具體取決於是否在同一個 JVM 中執行. 一個常見的例子是當 Spark 執行在 local 本地模式(–master = local[n])時,與部署 Spark 應用到群集(例如,通過 spark-submit 到 YARN):

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
Local vs. cluster modes

The behavior of the above code is undefined, and may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case foreach()). This closure is serialized and sent to each executor.

上述程式碼的行為未定義,可能不會按預期的方式工作。執行作業時,Spark將會分解RDD操作並加入到任務佇列中,每個任務都執行在一個單獨的執行器中。在執行之前,Spark將確認任務的閉包,而閉包是在RDD上的executor必須能夠訪問的變數和方法(例中的 foreach())。閉包被序列化並被髮送到每個執行器。

The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.

閉包的變數副本發給每個 counter ,當 counter 被 foreach 函式引用的時候,它已經不再是 driver node 的 counter 了。雖然在 driver node 仍然有一個 counter 在記憶體中,但是對 executors 已經不可見。executor 看到的只是序列化的閉包一個副本。所以 counter 最終的值還是 0,因為對 counter 所有的操作均引用序列化的 closure 內的值。

In local mode, in some circumstances the foreach function will actually execute within the same JVM as the driver and will reference the same original counter, and may actually update it.

在 local 本地模式,在某些情況下的foreach功能實際上是同一JVM上的驅動程式中執行,並會引用同一個原始的counter計數器,實際上可能進行更新操作.

To ensure well-defined behavior in these sorts of scenarios one should use an Accumulator. Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.

為了確保類似的場景下行為明確應用使用累加器(Accumulator)。Spark的累加器使用了一種特殊的方式提供了一種安全的機制去更新叢集中各個worker節點的變數。本指南的累加器部分將更詳細地討論這些問題。

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.

通常情況下,closures - constructs類似於迴圈或區域性定義的方法,不應該使用者去改變一些全域性的狀態。Spark不能保證類似的行為能夠正常的執行。有些程式碼可以在本地模式下工作,但這只是偶然的,這樣的程式碼在分散式模式下不會像預期的那樣執行。如果需要一些全域性的聚合功能,應使用 Accumulator(累加器)。

Printing elements of an RDD

Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).

另一種常見的語法用於列印 RDD 的所有元素使用rdd.foreach(println)或rdd.map(println)。在單臺機器上,這將預期輸出和列印 RDD 的所有元素。在叢集模式下,標準輸出將會被叢集節點中的executor所替代,而不會再驅動程式中輸出,所以驅動程式中不會輸出元素資訊!要列印 driver 程式的所有元素,可以使用的 collect() 方法首先把 RDD 放到 driver 程式節點上,像這樣: rdd.collect().foreach(println)。但是這可能會導致驅動節點記憶體耗盡,由於collect()會獲取RDD所有元素到一臺機器上,如果你僅僅需要列印幾個元素,一種更安全的方式是使用take()方法:rdd.take(100).foreach(println).