1. 程式人生 > >Spark RDD初探(一)

Spark RDD初探(一)

serializa 只有一個 broadcast each函數 flat -s cover med 溢出

本文概要

本文主要從以下幾點闡述RDD,了解RDD

  1. 什麽是RDD?
  2. 兩種RDD創建方式
  3. 向給spark傳遞函數Passing Functions to Spark
  4. 兩種操作之轉換Transformations
  5. 兩種操作之行動Actions
  6. 惰性求值
  7. RDD持久化Persistence
  8. 理解閉包Understanding closures
  9. 共享變量Shared Variables
  10. 總結

Working with Key-Value Pairs、Shuffle operations、patitioning與並行度、DoubleRDDFunctions、RDD如何保障數據處理效率、RDD對容錯的支持等知識點將做下一篇文章中闡述。


技術分享圖片
與許多專有的大數據處理平臺不同,Spark建立在統一抽象的RDD之上,使得它可以以基本一致的方式應對不同的大數據處理場景,包括MapReduce,Streaming,SQL,Machine Learning以及Graph等。

要使用Spark,首先需要理解RDD。

1. 什麽是RDD?

官網解釋:

 the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. 

RDD全稱為Resilient Distributed Datasets,是一個容錯的集合,它可以並行操作。

技術分享圖片

RDD作為數據結構,本質上是一個只讀的分區記錄集合。一個RDD可以包含多個分區,每個分區就是一個dataset片段。多個分區可以由多個任務並行計算。

RDD有容錯機制,這點將做下篇文章中探討。

2. 兩種RDD創建方式

  • Parallelized Collections集合並行化:

    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
  • External Datasets從外部數據集創建:
    • any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.它支持text、SequenceFiles、任意的hadoop輸入格式。

      JavaRDD<String> distFile = sc.textFile("data.txt");

3. 向spark傳遞函數Passing Functions to Spark

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. Spark’s API嚴重依賴傳遞給他的函數,驅動程序在集群上運行函數。

在java中需要實現接口 org.apache.spark.api.java.function,有如下兩種方式:

  • 實現接口:Implement the Function interfaces in your own class, either as an anonymous inner class or a named one, and pass an instance of it to Spark.
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});
  • lambda表達式:Use lambda expressions to concisely define an implementation.
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

4. 兩種操作之轉換Transformations

轉換操作返回的結果是RDD. RDD-->RDD。

轉換操作舉例:

  • map:
原始數據 函數 結果
{1,2,3,4} rdd.map(x => x+1) {2,3,4,5}
  • filter
原始數據 函數 結果
{1,2,3,4} rdd.filter(x => x!=1) {2,3,4}
  • flatmap: 1-->N
原始數據 函數 結果
{1,2,3,4} rdd.filter(x => x!=1) {2,3,4}
  • 偽集合操作
    • distinct(): 需要網絡混洗數據shuffle
    • intersection(): shuffle
    • subtract(): shuffle
    • union():
    • cartesian: 笛卡爾積。應用於求用戶相似度時。
  • 註意:transformations只有在執行action時才會執行轉換

5. 兩種操作之行動Actions

Action: return a value to the driver program after running a computation on the dataset.在數據集計算完成後返回一個結果給驅動程序。

最常見的行動操作:reduce(func):通過函數func先聚集各分區的數據集,再聚集分區之間的數據,func接收兩個參數,返回一個新值,新值再做為參數繼續傳遞給函數func,直到最後一個元素

原始數據 函數 結果
{1,2,3,4} rdd.reduce((x, y) => x + y) 10
  • 常用action

6. 惰性求值

只有發生行動操作時,轉換才會真正執行。

好處:This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset. spark運行性能更好。比如一個數據集結果map/reduce 處理後得到一個結果,而不是返回一個很大的map數據集。

  • Spark可以把一些操作合並到一起來減少計算數據的步驟,比如可以對RDD進行多次轉換。而在類似 Hadoop MapReduce 的系統中,開發者常常花費大量時間考慮如何把操作組合到一起,以減少MapReduce 的周期數。
  • 寫一些很復雜的映射,性能並不一定比簡單操作好。所以可以寫一些簡單的連續的操作,這些操作更容易管理。

7. RDD持久化Persistence

RDD支持兩種持久化方式:

  • persist()
  • cache(): Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it. cache是容錯的,如果RDD的任一分區丟失了,它將自動重新計算。

在運行action時,RDD會被反復計算。為了避免多次計算同一RDD,可以對其持久化。

lineLengths.persist(StorageLevel.MEMORY_ONLY());

spark存儲級別:

技術分享圖片

加上後綴‘_2’,持久化兩份數據

存儲級別的選擇Which Storage Level to Choose?

  • MEMORY_ONLY:the most CPU-efficient option。缺省的、最快的方式
  • MEMORY_ONLY_SER : selecting a fast serialization library to make the objects much more space-efficient,but still reasonably fast to access. 通過快速序列化庫節省空間,但是訪問快遞。
  • Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. 不要溢出到磁盤,除非計算數據集的成本太高,或者需要過濾大量數據
  • Use the replicated storage levels if you want fast fault recovery.

Removing Data

內存管理算法:least-recently-used (LRU) fashion

手動清內存:RDD.unpersist() method.

8. 理解閉包Understanding closures

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster.
理解變量和方法的範圍和生命周期

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don‘t do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

本地模式 Vs. 集群模式

預備知識:Spark應用程序的組成

技術分享圖片

Driver: 驅動器,一個 job 只有一個,主要負責 job 的解析,與 task 的調度等。

Executor:執行器,實際運行 task 的地方,一個 job 有多個。

再回過頭看上面代碼的行為是有歧義的。

  • 本地模式: 以本地模式運行在單個JVM上,運行程序的 JVM 和運行驅動器的 JVM 是同一個。所以操作就會引用到原始的 counter,對RDD中的值進行累加,並且將它存儲到counter中。
  • 集群模式: 以集群模式運行時,上面的代碼的結果也許不會如我們預期的那樣。當執行一個作業(job)時,Spark會將RDD 操作拆分成多個任務(task),多個task並行處理--每一個任務都會由一個executor來執行。

    在執行之前,Spark會計算閉包(closure)。閉包是對executors可見的變量和方法,executors會用閉包來執行RDD上的計算(在這個例子中,閉包是foreach())。這個閉包是被序列化的,並且發送給每個executor。在本地模式中,只有一個executor,所以共享相同的閉包。然而,在集群模式中,就不是這樣了。executors會運行在各自的worker節點中,每個executor都有閉包的一個副本。

    發送給每個executor的閉包中的變量其實也是副本。每個foreach函數中引用的counter不再是driver節點上的counter。當然,在driver節點的內存中仍然存在這一個counter,但是這個counter對於executors來說是不可見的。executors只能看到自己的閉包中副本。這樣,counter最後的值仍舊是0,因為所有在counter的操作只引用了序列化閉包中的值。

簡單說,spark為了避免閉包引入的問題,僅處理閉包內的局部變量。

如何做到這一點?
- 將傳入的變量和方法,拷貝到閉包中。

那麽如何確保上面的行動Action正確執行,達到預期的效果?Shared Variables

9. 共享變量Shared Variables

兩種使用模式:broadcast variables and accumulators.

廣播變量Broadcast Variables

creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.什麽時候創建廣播變量?任務的多個步驟需要同一份數據 或者緩存數據的反序列化 很重要時。

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. 廣播變量緩存在每臺機器上。

The data broadcasted this way is cached in serialized form and deserialized before running each task.廣播數據序列化後緩存在緩存中,在運行每個任務時進行反序列化操作。

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

廣播變量只能被傳播一次,傳播之後不能修改。

Accumulators

一般用來做集算器counter或者求和sum.

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value.累加器只在action中執行,spark保證每個task只更新一次累加值,重新執行任務不會更新其值。

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));

accum.value();
// returns 10

累加器執行過程:

技術分享圖片

10. 總結

RDD的操作只有三種:

  • 創建RDD
  • 轉化RDD。transformation
  • 調用RDD操作進行求值。action

RDD是惰性求值,只有執行Action時,轉化操作才會執行。

如果多次使用轉化操作中的數據,可以將數據緩存,避免多次重復計算。

spark的難點之一,就是理解閉包。因為數據是在task中並行處理,所以不能在task中處理普通的全局變量,只能使用共享變量Shared Variables。

參考文獻

  • RDD Programming Guide
  • RDD:基於內存的集群計算容錯抽象
  • 深入理解Spark RDD抽象模型和編寫RDD函數
  • 理解Spark的核心RDD
  • Spark 3. RDD 操作一 基礎 ,放入方法,閉包,輸出元素, 使用 K-V 工作
  • 理解Spark中的閉包(closure)

Spark RDD初探(一)