1. 程式人生 > >Spark中的程式設計模型

Spark中的程式設計模型

1. Spark中的基本概念

在Spark中,有下面的基本概念。
Application:基於Spark的使用者程式,包含了一個driver program和叢集中多個executor
Driver Program:執行Application的main()函式並建立SparkContext。通常SparkContext代表driver program
Executor:為某Application執行在worker node上的一個程序。該程序負責執行Task,並負責將資料存在記憶體或者磁碟上。每個Application都有自己獨立的executors
Cluster Manager: 在叢集上獲得資源的外部服務(例如 Spark Standalon,Mesos、Yarn)


Worker Node: 叢集中任何可執行Application程式碼的節點
Task:被送到executor上執行的工作單元。
Job:可以被拆分成Task平行計算的工作單元,一般由Spark Action觸發的一次執行作業。
Stage:每個Job會被拆分成很多組Task,每組任務被稱為stage,也可稱TaskSet。該術語可以經常在日誌中看打。
RDD :Spark的基本計算單元,通過Scala集合轉化、讀取資料集生成或者由其他RDD經過運算元操作得到。

2. Spark應用框架


客戶Spark程式(Driver Program)來操作Spark叢集是通過SparkContext物件來進行,SparkContext作為一個操作和排程的總入口,在初始化過程中叢集管理器會建立DAGScheduler作業排程和TaskScheduler任務排程。

DAGScheduler作業排程模組是基於Stage的高層排程模組(參考:Spark分析之DAGScheduler),DAG全稱 Directed Acyclic Graph,有向無環圖。簡單的來說,就是一個由頂點和有方向性的邊構成的圖中,從任意一個頂點出發,沒有任何一條路徑會將其帶回到出發的頂點。它為每個Spark Job計算具有依賴關係的多個Stage任務階段(通常根據Shuffle來劃分Stage,如groupByKey, reduceByKey等涉及到shuffle的transformation就會產生新的stage),然後將每個Stage劃分為具體的一組任務,以TaskSets的形式提交給底層的任務排程模組來具體執行。其中,不同stage之前的RDD為寬依賴關係。 TaskScheduler任務排程模組負責具體啟動任務,監控和彙報任務執行情況。

建立SparkContext一般要經過下面幾個步驟:

a). 匯入Spark的類和隱式轉換

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
b). 構建Spark應用程式的應用資訊物件SparkConf
val conf = new SparkConf().setAppName(appName).setMaster(master_url)
c). 利用SparkConf物件來初始化SparkContext
val sc = new SparkContext(conf)
d). 建立RDD、並執行相應的Transformation和action並得到最終結果。
e). 關閉Context

在完成應用的設計和編寫後,使用spark-submit來提交應用的jar包。spark-submit的命令列參考如下:

./bin/spark-submit 
  --class <main-class>
  --master <master-url> 
  --deploy-mode <deploy-mode> 
  ... # other options
  <application-jar> 
  [application-arguments]

Spark的執行模式取決於傳遞給SparkContext的MASTER環境變數的值。master URL可以是以下任一種形式:
Master URL 含義
local 使用一個Worker執行緒本地化執行SPARK(完全不併行)
local[*]使用邏輯CPU個數數量的執行緒來本地化執行Spark
local[K]使用K個Worker執行緒本地化執行Spark(理想情況下,K應該根據執行機器的CPU核數設定)
spark://HOST:PORT連線到指定的Spark standalone master。預設埠是7077.
yarn-client以客戶端模式連線YARN叢集。叢集的位置可以在HADOOP_CONF_DIR 環境變數中找到。
yarn-cluster 以叢集模式連線YARN叢集。叢集的位置可以在HADOOP_CONF_DIR 環境變數中找到。
mesos://HOST:PORT 連線到指定的Mesos叢集。預設介面是5050.

而spark-shell會在啟動的時候自動構建SparkContext,名稱為sc。

3. RDD的創造

Spark所有的操作都圍繞彈性分散式資料集(RDD)進行,這是一個有容錯機制並可以被並行操作的元素集合,具有隻讀、分割槽、容錯、高效、無需物化、可以快取、RDD依賴等特徵。

目前有兩種型別的基礎RDD:

並行集合(Parallelized Collections):接收一個已經存在的Scala集合,然後進行各種平行計算。

Hadoop資料集(Hadoop Datasets) :在一個檔案的每條記錄上執行函式。只要檔案系統是HDFS,或者hadoop支援的任意儲存系統即可。 

這兩種型別的RDD都可以通過相同的方式進行操作,從而獲得子RDD等一系列拓展,形成lineage血統關係圖。

(1). 並行化集合
並行化集合是通過呼叫SparkContext的parallelize方法,在一個已經存在的Scala集合上建立的(一個Seq物件)。集合的物件將會被拷貝,創建出一個可以被並行操作的分散式資料集。例如,下面的直譯器輸出,演示瞭如何從一個數組建立一個並行集合。
例如:

val rdd = sc.parallelize(Array(1 to 10)) 根據能啟動的executor的數量來進行切分多個slice,每一個slice啟動一個Task來進行處理。

val rdd = sc.parallelize(Array(1 to 10), 5) 指定了partition的數量

(2). Hadoop資料集

Spark可以將任何Hadoop所支援的儲存資源轉化成RDD,如本地檔案(需要網路檔案系統,所有的節點都必須能訪問到)、HDFS、Cassandra、HBase、Amazon S3等,Spark支援文字檔案、SequenceFiles和任何Hadoop InputFormat格式。
a). 使用textFile()方法可以將本地檔案或HDFS檔案轉換成RDD
支援整個檔案目錄讀取,檔案可以是文字或者壓縮檔案(如gzip等,自動執行解壓縮並載入資料)。如textFile(”file:///dfs/data”)
支援萬用字元讀取,例如:

val rdd1 = sc.textFile("file:///root/access_log/access_log*.filter");
val rdd2=rdd1.map(_.split("t")).filter(_.length==6)
rdd2.count()
......
14/08/20 14:44:48 INFO HadoopRDD: Input split: file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903
......

textFile()可選第二個引數slice,預設情況下為每一個block分配一個slice。使用者也可以通過slice指定更多的分片,但不能使用少於HDFS block的分片數。

b). 使用wholeTextFiles()讀取目錄裡面的小檔案,返回(使用者名稱、內容)對
c). 使用sequenceFile[K,V]()方法可以將SequenceFile轉換成RDD。SequenceFile檔案是Hadoop用來儲存二進位制形式的key-value對而設計的一種平面檔案(Flat File)。
d). 使用SparkContext.hadoopRDD方法可以將其他任何Hadoop輸入型別轉化成RDD使用方法。一般來說,HadoopRDD中每一個HDFS block都成為一個RDD分割槽。
此外,通過Transformation可以將HadoopRDD等轉換成FilterRDD(依賴一個父RDD產生)和JoinedRDD(依賴所有父RDD)等。

4. RDD操作


RDD支援兩類操作:
轉換(transformation)現有的RDD通關轉換生成一個新的RDD,轉換是延時執行(lazy)的。
動作(actions)在RDD上執行計算後,返回結果給驅動程式或寫入檔案系統。

例如,map就是一種transformation,它將資料集每一個元素都傳遞給函式,並返回一個新的分佈資料集表示結果。

reduce則是一種action,通過一些函式將所有的元素疊加起來,並將最終結果返回給Driver程式。

Transformations

(1). map(func)

Return a new distributed dataset formed by passing each element of the source through a function func.
返回一個新分散式資料集,由每一個輸入元素經過func函式轉換後組成

2). filter(func)

Return a new dataset formed by selecting those elements of the source on which func returns true.
返回一個新資料集,由經過func函式計算後返回值為true的輸入元素組成

val num=sc.parallelize(1 to 100)
val num2 = num.map(_*2)
val num3 = num2.filter(_ % 3 == 0)
......
num3.collect
//res: Array[Int] = Array(6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 102, 108, 114, 120, 126, 132, 138, 144, 150, 156, 162, 168, 174, 180, 186, 192, 198)
num3.toDebugString
//res5: String =
//FilteredRDD[20] at filter at <console>:16 (48 partitions)
//  MappedRDD[19] at map at <console>:14 (48 partitions)
//    ParallelCollectionRDD[18] at parallelize at <console>:12 (48 partitions)

(3). flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
類似於map,但是每一個輸入元素可以被對映為0或多個輸出元素(因此func應該返回一個序列,而不是單一元素

val kv=sc.parallelize(List(List(1,2),List(3,4),List(3,6,8)))
kv.flatMap(x=>x.map(_+1)).collect
//res0: Array[Int] = Array(2, 3, 4, 5, 4, 7, 9)

//Word Count
sc.textFile('hdfs://hdp01:9000/home/debugo/*.txt').flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)

(4). mapPartitions(func)

mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
類似於map,但獨立地在RDD的每一個分塊上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T] => Iterator[U]。mapPartitions將會被每一個數據集分割槽呼叫一次。各個資料集分割槽的全部內容將作為順序的資料流傳入函式func的引數中,func必須返回另一個Iterator[T]。被合併的結果自動轉換成為新的RDD。下面的測試中,元組(3,4)和(6,7)將由於我們選擇的分割槽策略和方法而消失。
The combined result iterators are automatically converted into a new RDD. Please note, that the tuples (3,4) and (6,7) are missing from the following result due to the partitioning we chose
val nums = sc . parallelize (1 to 9 , 3)
def myfunc[T] ( iter : Iterator [T] ) : Iterator [( T , T ) ] = {
    var res = List [(T , T) ]()
    var pre = iter.next
    while ( iter.hasNext )
    {
        val cur = iter . next ;
        res .::= ( pre , cur )
        pre = cur ;
    }
    res . iterator
}
//myfunc: [T](iter: Iterator[T])Iterator[(T, T)]
nums.mapPartitions(myfunc).collect
//res12: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))


(5). mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T=>) ==> Iterator<U=> when running on an RDD of type T.
類似於mapPartitions, 其函式原型是:
def mapPartitionsWithIndex [ U : ClassTag ]( f : ( Int , Iterator [ T ]) => Iterator [ U ] , preservesPartitioning : Boolean = false ) : RDD [ U ],
mapPartitionsWithIndex的func接受兩個引數,第一個引數是分割槽的索引,第二個是一個數據集分割槽的迭代器。而輸出的是一個包含經過該函式轉換的迭代器。下面測試中,將分割槽索引和分割槽資料一起輸出。

val x = sc . parallelize ( List (1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10) , 3)
def myfunc ( index : Int , iter : Iterator [ Int ]) : Iterator [ String ] = {
iter . toList . map ( x => index + "-" + x ) . iterator
}
//myfunc: (index: Int, iter: Iterator[Int])Iterator[String]
x . mapPartitionsWithIndex ( myfunc ) . collect()
res: Array[String] = Array(0-1, 0-2, 0-3, 1-4, 1-5, 1-6, 2-7, 2-8, 2-9, 2-10)

(6). sample(withReplacement,fraction, seed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
根據fraction指定的比例,對資料進行取樣,可以選擇是否用隨機數進行替換,seed用於指定隨機數生成器種子。

val a = sc . parallelize (1 to 10000 , 3)
a . sample ( false , 0.1 , 0) . count
res0 : Long = 960
a . sample ( true , 0.7 , scala.util.Random.nextInt(10000)) . count
res1: Long = 7073

(7). union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.
返回一個新的資料集,新資料集是由源資料集和引數資料集聯合而成。

(8). intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

(9). distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.
返回一個包含源資料集中所有不重複元素的新資料集

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
val kv2=sc.parallelize(List(("A",4),("A", 2),("C",3),("A",4),("B",5)))
kv2.distinct.collect
res0: Array[(String, Int)] = Array((A,4), (C,3), (B,5), (A,2))
kv1.union(kv2).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,4), (A,2), (C,3), (A,4), (B,5))
kv1.union(kv2).collect.distinct
res2: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,2))
kv1.intersection(kv2).collect
res43: Array[(String, Int)] = Array((A,4), (C,3), (B,5))

(10.)groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

在一個(K,V)對的資料集上呼叫,返回一個(K,Seq[V])對的資料集
注意:預設情況下,只有8個並行任務來做操作,但是你可以傳入一個可選的numTasks引數來改變它。如果分組是用來計算聚合操作(如sum或average),那麼應該使用reduceByKey 或combineByKey 來提供更好的效能。
groupByKey, reduceByKey等transformation操作涉及到了shuffle操作,所以這裡引出兩個概念寬依賴和窄依賴。


窄依賴(narrow dependencies)
子RDD的每個分割槽依賴於常數個父分割槽(與資料規模無關)
輸入輸出一對一的運算元,且結果RDD的分割槽結構不變。主要是map/flatmap
輸入輸出一對一的運算元,但結果RDD的分割槽結構發生了變化,如union/coalesce
從輸入中選擇部分元素的運算元,如filter、distinct、substract、sample


寬依賴(wide dependencies)
子RDD的每個分割槽依賴於所有的父RDD分割槽
對單個RDD基於key進行重組和reduce,如groupByKey,reduceByKey
對兩個RDD基於key進行join和重組,如join
經過大量shuffle生成的RDD,建議進行快取。這樣避免失敗後重新計算帶來的開銷。
注意:reduce是一個action,和reduceByKey完全不同。

(11).reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like ingroupByKey, the number of reduce tasks is configurable through an optional second argument.
在一個(K,V)對的資料集上呼叫時,返回一個(K,V)對的資料集,使用指定的reduce函式,將相同key的值聚合到一起。類似groupByKey,reduce任務個數是可以通過第二個可選引數來配置的


(12).sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
在一個(K,V)對的資料集上呼叫,K必須實現Ordered介面,返回一個按照Key進行排序的(K,V)對資料集。升序或降序由ascending布林引數決定

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
res0: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3))
kv1.sortByKey().collect //注意sortByKey的小括號不能省
res1: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5), (C,3))
kv1.groupByKey().collect
res1: Array[(String, Iterable[Int])] = Array((A,ArrayBuffer(1, 4)), (B,ArrayBuffer(2, 5)), (C,ArrayBuffer(3)))
kv1.reduceByKey(_+_).collect
res2: Array[(String, Int)] = Array((A,5), (B,7), (C,3))

(13). join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported through leftOuterJoin and rightOuterJoin.
在型別為(K,V)和(K,W)型別的資料集上呼叫時,返回一個相同key對應的所有元素對在一起的(K, (V, W))資料集

(14).cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable, Iterable) tuples. This operation is also called groupWith.
在型別為(K,V)和(K,W)的資料集上呼叫,返回一個 (K, Seq[V], Seq[W])元組的資料集。這個操作也可以稱之為groupwith

val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
val kv3=sc.parallelize(List(("A",10),("B",20),("D",30)))
kv1.join(kv3).collect
res16: Array[(String, (Int, Int))] = Array((A,(1,10)), (A,(4,10)), (B,(2,20)), (B,(5,20)))
kv1.cogroup(kv3).collect
res0: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((A,(ArrayBuffer(1, 4),ArrayBuffer(10))), (B,(ArrayBuffer(2, 5),ArrayBuffer(20))), (C,(ArrayBuffer(3),ArrayBuffer())), (D,(ArrayBuffer(),ArrayBuffer(30))))

(15).cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
笛卡爾積,在型別為 T 和 U 型別的資料集上呼叫時,返回一個 (T, U)對資料集(兩兩的元素對)

(16). pipe(command, [envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
通過POSIX 管道來將每個RDD分割槽的資料傳入一個shell命令(例如Perl或bash指令碼)。RDD元素會寫入到程序的標準輸入,其標準輸出會作為RDD字串返回。

(17).coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
將RDD分割槽的數量降低為numPartitions,對於經過過濾後的大資料集的線上處理更加有效。

(18).repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
隨機重新shuffle RDD中的資料,並建立numPartitions個分割槽。這個操作總會通過網路來shuffle全部資料。


Actions

(19). reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
通過函式func(接受兩個引數,返回一個引數)聚集資料集中的所有元素。這個功能必須可交換且可關聯的,從而可以正確的被並行執行。

(20). collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
在驅動程式中,以陣列的形式,返回資料集的所有元素。這通常會在使用filter或者其它操作並返回一個足夠小的資料子集後再使用會比較有用。

(21). count()

Return the number of elements in the dataset.
返回資料集的元素的個數。

(22). first()

Return the first element of the dataset (similar to take(1)).
返回資料集的第一個元素(類似於take(1))

(23). take(n)

Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
返回一個由資料集的前n個元素組成的陣列。注意,這個操作目前並非並行執行,而是由驅動程式計算所有的元素

(24). countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
對(K,V)型別的RDD有效,返回一個(K,Int)對的Map,表示每一個key對應的元素個數

(25). foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
在資料集的每一個元素上,執行函式func進行更新。這通常用於邊緣效果,例如更新一個累加器,或者和外部儲存系統進行互動,例如HBase.

val num=sc.parallelize(1 to 10)
num.reduce (_ + _)
res1: Int = 55
num.take(5)
res2: Array[Int] = Array(1, 2, 3, 4, 5)
num.first
res3: Int = 1
num.count
res4: Long = 10
num.take(5).foreach(println)
1
2
3
4
5
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5),("A",7),("B",7)))
val kv1_count=kv1.countByKey()
kv1_count: scala.collection.Map[String,Long] = Map(A -> 3, C -> 1, B -> 3)


(26). takeSample(withReplacement,num, seed)

Return an array with a random sample of num elements of the dataset, with or without replacement, using the given random number generator seed.
返回一個數組,在資料集中隨機取樣num個元素組成,可以選擇是否用隨機數替換不足的部分,Seed用於指定的隨機數生成器種子

(27). takeOrdered(n, [ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.
返回一個由資料集的前n個元素組成的有序陣列,使用自然序或自定義的比較器。

(28). saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
將資料集的元素,以textfile的形式,儲存到本地檔案系統,HDFS或者任何其它hadoop支援的檔案系統。對於每個元素,Spark將會呼叫toString方法,將它轉換為檔案中的文字行

(29). saveAsSequenceFile(path)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
將資料集的元素,以Hadoop sequencefile的格式,儲存到指定的目錄下,本地系統,HDFS或者任何其它hadoop支援的檔案系統。這個只限於由key-value對組成,並實現了Hadoop的Writable介面,或者隱式的可以轉換為Writable的RDD。(Spark包括了基本型別的轉換,例如Int,Double,String,等等)

(30). saveAsObjectFile(path)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
將資料集元素寫入Java序列化的可以被SparkContext.objectFile()載入的簡單格式中。
當然,transformation和action的操作遠遠不止這些。其他請參考API文件:
RDD API


5. RDD快取

Spark可以使用 persist 和 cache 方法將任意 RDD 快取到記憶體、磁碟檔案系統中。快取是容錯的,如果一個 RDD 分片丟失,可以通過構建它的 transformation自動重構。被快取的 RDD 被使用的時,存取速度會被大大加速。一般的executor記憶體60%做 cache, 剩下的40%做task。
Spark中,RDD類可以使用cache() 和 persist() 方法來快取。cache()是persist()的特例,將該RDD快取到記憶體中。而persist可以指定一個StorageLevel。StorageLevel的列表可以在StorageLevel 伴生單例物件中找到:
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(false, false, true, false) // Tachyon
}
class StorageLevel private(  
     private var useDisk_      :    Boolean,  
     private var useMemory_   :  Boolean,  
     private var useOffHeap_   : Boolean,  
     private var deserialized_ : Boolean,  
     private var replication_  : Int = 1
)


Spark的不同StorageLevel ,目的滿足記憶體使用和CPU效率權衡上的不同需求。我們建議通過以下的步驟來進行選擇:

相關推薦

Spark程式設計模型

1. Spark中的基本概念 在Spark中,有下面的基本概念。Application:基於Spark的使用者程式,包含了一個driver program和叢集中多個executorDriver Program:執行Application的main()函式並建立Spar

比較全的Spark的函式使用及程式設計模型

1. Spark中的基本概念 在Spark中,有下面的基本概念。 Application:基於Spark的使用者程式,包含了一個driver program和叢集中多個executor Driver Program:執行Application的main()函式並建立Spar

Spark 編程模型()

tool irf split exe too rdd count pil 取數 先在IDEA新建一個maven項目 我這裏用的是jdk1.8,選擇相應的骨架 這裏選擇本地在window下安裝的maven 新的項目創建成功 我的開始pom.xml

轉載:Spark中文指南(入門篇)-Spark程式設計模型(一)

原文:https://www.cnblogs.com/miqi1992/p/5621268.html 前言   本章將對Spark做一個簡單的介紹,更多教程請參考: Spark教程 本章知識點概括 Apache Spark簡介 Spark的四種執行模式 Spark基於

Javascript常見的非同步程式設計模型

在Javascript非同步程式設計專題的前一篇文章淺談Javascript中的非同步中,我簡明的闡述了“Javascript中的非同步原理”、“Javascript如何在單執行緒上實現非同步呼叫”以及“Javascript中的定時器”等相關問題。 本篇文章我將

SparkSpark 程式設計模型及快速入門

Spark程式設計模型SparkContext類和SparkConf類程式碼中初始化我們可通過如下方式呼叫 SparkContext 的簡單建構函式,以預設的引數值來建立相應的物件。val sc = new SparkContext("local[4]", "Test Spa

Spark修煉之道(進階篇)——Spark入門到精通:第六節 Spark程式設計模型(三)

作者:周志湖 網名:搖擺少年夢 微訊號:zhouzhihubeyond 本節主要內容 RDD transformation(續) RDD actions 1. RDD transformation(續) (1)repartitionAnd

谷歌DataFlow程式設計模型以及Spark/Flink/StreamCQL的相關實現

流式計算框架程式設計介面的標準化,傻瓜化,SQL化,自打穀歌發表Dataflow程式設計模型的Paper起,就有走上臺面的趨勢。各家計算框架都開始認真考慮相關的問題,儼然成為大家競爭的熱點方向。在過去一年多的時間裡,Beam/Flink/Spark在這方面的努力和相關工作也逐

谷歌Dataflow程式設計模型spark 2.0 structured streaming

作者:劉旭暉 Raymond 轉載請註明出處主要介紹一下Dataflow程式設計模型的基本思想,最後面再簡單比較一下spark 2.0 structured streaming的程式設計模型== 是什麼 ==谷歌Dataflow首先是一個服務https://cloud.google.com/dataflow,

spark-sql的概述以及程式設計模型的介紹

1、spark sql的概述 (1)spark sql的介紹:   Spark SQL 是 Spark 用來處理結構化資料(結構化資料可以來自外部結構化資料來源也可以通 過 RDD 獲取)的一個模組,它提供了一個程式設計抽象叫做 DataFrame 並且作為分散式 SQL 查 詢引擎的作用。  外部的結構

Spark入門實戰系列--3.Spark程式設計模型(上)--程式設計模型及SparkShell實戰

rdd4的生成比較複雜,我們分步驟進行解析,軸線map(x=>(x(1),1))是獲取每行的第二個欄位(使用者Session)計數為1,然後reduceByKey(_+_)是安排Key進行累和,即按照使用者Session號進行計數求查詢次數,其次map(x=>(x._2,x._1))是把Key和V

Spark入門實戰系列--3.Spark程式設計模型(下)--IDEA搭建及實戰

1 package class3 2 3 import org.apache.spark.SparkContext._ 4 import org.apache.spark.{SparkConf, SparkContext} 5 6 object Join{ 7 def

Spring Boot 實踐折騰記(17):Spring WebFlux的函數語言程式設計模型

楊絳先生說:大部分人的問題是,做得不多而想得太多。 今天要講的函數語言程式設計可能和Spring Boot本身的關係不太大,但是它很重要!不僅是因為從Java 7升級到Java 8多了一種新程式設計語法的支援,更因為這是一種不同的思維模式。同時,今天的內容可能

網路程式設計select模型和poll模型學習(linux)

一、概述 併發的網路程式設計中不管是阻塞式IO還是非阻塞式IO,都不能很好的解決同時處理多個socket的問題。作業系統提供了複用IO模型:select和poll,幫助我們解決了這個問題。這兩個函式都能夠允許程序指示核心等待多個事件中的任何一個發生,並只在有一個或多個事件發

scala實戰之spark讀取mysql資料表並存放到mysql庫程式設計例項

今天簡單講解一下應用spark1.5.2相關讀取mysql資料到DataFrame的介面以及將DF資料存放到mysql中介面實現例項 同樣我們的程式設計開發環境是不需要安裝spark的,但是需要一臺安裝了mysql的伺服器,我這裡直接在本機安裝了一個mysql,還有就是sc

Spark程式設計模型(之莎士比亞文集詞頻統計實現)

Spark程式設計模型之莎士比亞文集詞頻統計         前段時間因為學校的雲端計算比賽我無意間接觸到了Spark雲端計算框架,從此對其一發不可收拾,無論從其執行效率還有他的其他方面的架構都感覺到無比強大,作為一個雲端計算的解決方案他有著比hadoop更好的優越性。因為Spark我又接觸到程式語言屆的

Spark程式設計指引(四)---------------------------DStreams基本模型,輸入DStreams和接收者

離散流(DStreams) 離散流或者稱為DStreams是Spark流程式設計提供的基本抽象。它代表了持續的資料流,從一個數據源接收到的資料流或者是在一個輸入流上應用轉變操作處理後的資料流。 在內部實現上,DStream代表了一系列連續的RDDs.RDDs是Spark對不

sparkstream程式設計指導(一)

概述 spark stream是對spark核心api的擴充套件,其有著很好的擴充套件性,很高的吞吐量以及容錯性的動態資料的流式處理過程。資料可以來自不同的資料來源,例如Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP socke

Spark流式程式設計介紹 - 程式設計模型

來源Spark官方文件 http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model 程式設計模型 結構化流中的核心概念就是將活動資料流當作一個會不斷增長的表。這是一個新的

Spark 關於Parquet的應用與性能初步測試

spark 大數據 hadoop hive parquetSpark 中關於Parquet的應用Parquet簡介 Parquet是面向分析型業務的列式存儲格式,由Twitter和Cloudera合作開發,2015年5月從Apache的孵化器裏畢業成為Apache頂級項目http://parquet.apa