1. 程式人生 > >比較全的Spark中的函式使用及程式設計模型

比較全的Spark中的函式使用及程式設計模型

1. Spark中的基本概念

在Spark中,有下面的基本概念。
Application:基於Spark的使用者程式,包含了一個driver program和叢集中多個executor
Driver Program:執行Application的main()函式並建立SparkContext。通常SparkContext代表driver program
Executor:為某Application執行在worker node上的餓一個程序。該程序負責執行Task,並負責將資料存在記憶體或者磁碟上。每個Application都有自己獨立的executors
Cluster Manager: 在叢集上獲得資源的外部服務(例如 Spark Standalon,Mesos、Yarn)
Worker Node: 叢集中任何可執行Application程式碼的節點
Task:被送到executor上執行的工作單元。
Job:可以被拆分成Task平行計算的工作單元,一般由Spark Action觸發的一次執行作業。
Stage:每個Job會被拆分成很多組Task,每組任務被稱為stage,也可稱TaskSet。該術語可以經常在日誌中看打。
RDD:Spark的基本計算單元,通過Scala集合轉化、讀取資料集生成或者由其他RDD經過運算元操作得到。

2. Spark應用框架

1
客戶Spark程式(Driver Program)來操作Spark叢集是通過SparkContext物件來進行,SparkContext作為一個操作和排程的總入口,在初始化過程中叢集管理器會建立DAGScheduler作業排程和TaskScheduler任務排程。
DAGScheduler作業排程模組是基於Stage的高層排程模組(參考:Spark分析之DAGScheduler),DAG全稱 Directed Acyclic Graph,有向無環圖。簡單的來說,就是一個由頂點和有方向性的邊構成的圖中,從任意一個頂點出發,沒有任何一條路徑會將其帶回到出發的頂點。它為每個Spark Job計算具有依賴關係的多個Stage任務階段(通常根據Shuffle來劃分Stage,如groupByKey, reduceByKey等涉及到shuffle的transformation就會產生新的stage),然後將每個Stage劃分為具體的一組任務,以TaskSets的形式提交給底層的任務排程模組來具體執行。其中,不同stage之前的RDD為寬依賴關係。 TaskScheduler任務排程模組負責具體啟動任務,監控和彙報任務執行情況。
建立SparkContext一般要經過下面幾個步驟:
a). 匯入Spark的類和隱式轉換

1 2 importorg.apache.spark.{SparkContext,SparkConf} importorg.apache.spark.SparkContext._


b). 構建Spark應用程式的應用資訊物件SparkConf

1 valconf=newSparkConf().setAppName(appName).setMaster(master_url)

c). 利用SparkConf物件來初始化SparkContext
val sc = new SparkContext(conf)
d). 建立RDD、並執行相應的Transformation和action並得到最終結果。
e). 關閉Context

在完成應用的設計和編寫後,使用spark-submit來提交應用的jar包。spark-submit的命令列參考如下:
Submitting Applications

1 2 3 4 5 6 7 ./bin/spark-submit --class<main-class> --master<master-url> --deploy-mode<deploy-mode> ...# other options <application-jar> [application-arguments]

Spark的執行模式取決於傳遞給SparkContext的MASTER環境變數的值。master URL可以是以下任一種形式:
Master URL 含義
local 使用一個Worker執行緒本地化執行SPARK(完全不併行)
local[*] 使用邏輯CPU個數數量的執行緒來本地化執行Spark
local[K] 使用K個Worker執行緒本地化執行Spark(理想情況下,K應該根據執行機器的CPU核數設定)
spark://HOST:PORT 連線到指定的Spark standalone master。預設埠是7077.
yarn-client 以客戶端模式連線YARN叢集。叢集的位置可以在HADOOP_CONF_DIR 環境變數中找到。
yarn-cluster 以叢集模式連線YARN叢集。叢集的位置可以在HADOOP_CONF_DIR 環境變數中找到。
mesos://HOST:PORT 連線到指定的Mesos叢集。預設介面是5050.
而spark-shell會在啟動的時候自動構建SparkContext,名稱為sc。

3. RDD的創造

Spark所有的操作都圍繞彈性分散式資料集(RDD)進行,這是一個有容錯機制並可以被並行操作的元素集合,具有隻讀、分割槽、容錯、高效、無需物化、可以快取、RDD依賴等特徵。目前有兩種型別的基礎RDD:並行集合(Parallelized Collections):接收一個已經存在的Scala集合,然後進行各種平行計算。 Hadoop資料集(Hadoop Datasets):在一個檔案的每條記錄上執行函式。只要檔案系統是HDFS,或者hadoop支援的任意儲存系統即可。 這兩種型別的RDD都可以通過相同的方式進行操作,從而獲得子RDD等一系列拓展,形成lineage血統關係圖。
(1). 並行化集合
並行化集合是通過呼叫SparkContext的parallelize方法,在一個已經存在的Scala集合上建立的(一個Seq物件)。集合的物件將會被拷貝,創建出一個可以被並行操作的分散式資料集。例如,下面的直譯器輸出,演示瞭如何從一個數組建立一個並行集合。
例如:val rdd = sc.parallelize(Array(1 to 10)) 根據能啟動的executor的數量來進行切分多個slice,每一個slice啟動一個Task來進行處理。
val rdd = sc.parallelize(Array(1 to 10), 5) 指定了partition的數量
(2). Hadoop資料集
Spark可以將任何Hadoop所支援的儲存資源轉化成RDD,如本地檔案(需要網路檔案系統,所有的節點都必須能訪問到)、HDFS、Cassandra、HBase、Amazon S3等,Spark支援文字檔案、SequenceFiles和任何Hadoop InputFormat格式。
a). 使用textFile()方法可以將本地檔案或HDFS檔案轉換成RDD
支援整個檔案目錄讀取,檔案可以是文字或者壓縮檔案(如gzip等,自動執行解壓縮並載入資料)。如textFile(”file:///dfs/data”)
支援萬用字元讀取,例如:

1 2 3 4 5 6 valrdd1=sc.textFile("file:///root/access_log/access_log*.filter"); valrdd2=rdd1.map(_.split("t")).filter(_.length==6) rdd2.count() ...... 14/08/2014:44:48INFO HadoopRDD:Input split:file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903 ......

textFile()可選第二個引數slice,預設情況下為每一個block分配一個slice。使用者也可以通過slice指定更多的分片,但不能使用少於HDFS block的分片數。
b). 使用wholeTextFiles()讀取目錄裡面的小檔案,返回(使用者名稱、內容)對
c). 使用sequenceFile[K,V]()方法可以將SequenceFile轉換成RDD。SequenceFile檔案是Hadoop用來儲存二進位制形式的key-value對而設計的一種平面檔案(Flat File)。
d). 使用SparkContext.hadoopRDD方法可以將其他任何Hadoop輸入型別轉化成RDD使用方法。一般來說,HadoopRDD中每一個HDFS block都成為一個RDD分割槽。
此外,通過Transformation可以將HadoopRDD等轉換成FilterRDD(依賴一個父RDD產生)和JoinedRDD(依賴所有父RDD)等。

4. RDD操作

2
RDD支援兩類操作:
轉換(transformation)現有的RDD通關轉換生成一個新的RDD,轉換是延時執行(lazy)的。
動作(actions)在RDD上執行計算後,返回結果給驅動程式或寫入檔案系統。
例如,map就是一種transformation,它將資料集每一個元素都傳遞給函式,並返回一個新的分佈資料集表示結果。reduce則是一種action,通過一些函式將所有的元素疊加起來,並將最終結果返回給Driver程式。

— Transformations —

(1). map(func)

Return a new distributed dataset formed by passing each element of the source through a function func.
返回一個新分散式資料集,由每一個輸入元素經過func函式轉換後組成

(2). filter(func)

Return a new dataset formed by selecting those elements of the source on which func returns true.
返回一個新資料集,由經過func函式計算後返回值為true的輸入元素組成
Test:

1 2 3 4 5 6 7 8 9 10 11 valnum=sc.parallelize(1to100) valnum2=num.map(_*2) valnum3=num2.filter(_%3==0) ...... num3.collect //res: Array[Int] = Array(6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 102, 108, 114, 120, 126, 132, 138, 144, 150, 156, 162, 168, 174, 180, 186, 192, 198) num3.toDebugString //res5: String = //FilteredRDD[20] at filter at <console>:16 (48 partitions) //  MappedRDD[19] at map at <console>:14 (48 partitions) //    ParallelCollectionRDD[18] at parallelize at <console>:12 (48 partitions)

(3). flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
類似於map,但是每一個輸入元素可以被對映為0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)
Test:

1 2 3 4 5 6 valkv=sc.parallelize(List(List(1,2),List(3,4),List(3,6,8))) kv.flatMap(x=>x.map(_+1)).collect //res0: Array[Int] = Array(2, 3, 4, 5, 4, 7, 9) //Word Count sc.textFile('hdfs://hdp01:9000/home/debugo/*.txt').flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)

(4). mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
類似於map,但獨立地在RDD的每一個分塊上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T] => Iterator[U]。mapPartitions將會被每一個數據集分割槽呼叫一次。各個資料集分割槽的全部內容將作為順序的資料流傳入函式func的引數中,func必須返回另一個Iterator[T]。被合併的結果自動轉換成為新的RDD。下面的測試中,元組(3,4)和(6,7)將由於我們選擇的分割槽策略和方法而消失。
The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose
Test:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 valnums=sc.parallelize(1to9,3) defmyfunc[T](iter:Iterator[T]):Iterator[(T,T)]={ varres=List[(T,T)]() varpre=iter.next while(iter.hasNext) { valcur=iter.next; res.::=(pre,cur) pre=cur; } res.iterator } //myfunc: [T](iter: Iterator[T])Iterator[(T, T)] nums.mapPartitions(myfunc).collect //res12: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

(5). mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T=>) ==> Iterator<U=> when running on an RDD of type T.
類似於mapPartitions, 其函式原型是:
def mapPartitionsWithIndex [ U : ClassTag ]( f : ( Int , Iterator [ T ]) => Iterator [ U ] , preservesPartitioning : Boolean = false ) : RDD [ U ],
mapPartitionsWithIndex的func接受兩個引數,第一個引數是分割槽的索引,第二個是一個數據集分割槽的迭代器。而輸出的是一個包含經過該函式轉換的迭代器。下面測試中,將分割槽索引和分割槽資料一起輸出。
Test:

1 2 3 4 5 6 7 valx=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10),3) defmyfunc(index:Int,iter:Iterator[Int]):Iterator[String]={ iter.toList.map(x=>index+"-"+x).iterator } //myfunc: (index: Int, iter: Iterator[Int])Iterator[String] x.mapPartitionsWithIndex(myfunc).collect() res:Array[String]=Array(0-1,0-2,0-3,1-4,1-5,1-6,2-7,2-8,2-9,2-10)

(6). sample(withReplacement,fraction, seed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
根據fraction指定的比例,對資料進行取樣,可以選擇是否用隨機數進行替換,seed用於指定隨機數生成器種子。

1 2 3 4 5 vala=sc.parallelize(1to10000,3) a.sample(false,0.1,0).count res0:Long=960 a.sample(true,0.7,scala.util.Random.nextInt(10000)).count res1:Long=7073

(7). union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.
返回一個新的資料集,新資料集是由源資料集和引數資料集聯合而成。

(8). intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

(9). distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.
返回一個包含源資料集中所有不重複元素的新資料集
Test:

1 2 3 4 5 6 7 8 9 10 valkv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5))) valkv2=sc.parallelize(List(("A",4),("A",2),("C",3),("A",4),("B",5))) kv2.distinct.collect res0:Array[(String,Int)]=Array((A,4),(C,3),(B,5),(A,2)) kv1.union(kv2).collect res1:Array[(String,Int)]=Array((A,1),(B,2),(C,3),(A,4),(B,5),(A,4),(A,2),(C,3),(A,4),(B,5)) kv1.union(kv2).collect.distinct res2:Array[(String,Int)]=Array((A,1),(B,2),(C,3),(A,4),(B,5),(A,2)) kv1.intersection(kv2).collect res43:Array[(String,Int)]=Array((A,4),(C,3),(B,5))

(10.)groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
在一個(K,V)對的資料集上呼叫,返回一個(K,Seq[V])對的資料集
注意:預設情況下,只有8個並行任務來做操作,但是你可以傳入一個可選的numTasks引數來改變它。如果分組是用來計算聚合操作(如sum或average),那麼應該使用reduceByKey 或combineByKey 來提供更好的效能。
groupByKey, reduceByKey等transformation操作涉及到了shuffle操作,所以這裡引出兩個概念寬依賴和窄依賴。
3
窄依賴(narrow dependencies)
子RDD的每個分割槽依賴於常數個父分割槽(與資料規模無關)
輸入輸出一對一的運算元,且結果RDD的分割槽結構不變。主要是map/flatmap
輸入輸出一對一的運算元,但結果RDD的分割槽結構發生了變化,如union/coalesce
從輸入中選擇部分元素的運算元,如filter、distinct、substract、sample
寬依賴(wide dependencies)
子RDD的每個分割槽依賴於所有的父RDD分割槽
對單個RDD基於key進行重組和reduce,如groupByKey,reduceByKey
對兩個RDD基於key進行join和重組,如join
經過大量shuffle生成的RDD,建議進行快取。這樣避免失敗後重新計算帶來的開銷。
注意:reduce是一個action,和reduceByKey完全不同。

(11).reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like ingroupByKey, the number of reduce tasks is configurable through an optional second argument.
在一個(K,V)對的資料集上呼叫時,返回一個(K,V)對的資料集,使用指定的reduce函式,將相同key的值聚合到一起。類似groupByKey,reduce任務個數是可以通過第二個可選引數來配置的

(12).sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
在一個(K,V)對的資料集上呼叫,K必須實現Ordered介面,返回一個按照Key進行排序的(K,V)對資料集。升序或降序由ascending布林引數決定
Test:

1 2 3 4 5 6 7 8 valkv1=sc.parallelize(List(("A",1),("B",2),(