Spark-2.0分割槽原理詳解
Spark分割槽原理分析
介紹
分割槽是指如何把RDD分佈在spark叢集的各個節點的操作。以及一個RDD能夠分多少個分割槽。
一個分割槽是大型分散式資料集的邏輯塊。
那麼思考一下:分割槽數如何對映到spark的任務數?如何驗證?分割槽和任務如何對應到本地的資料?
Spark使用分割槽來管理資料,這些分割槽有助於並行化分散式資料處理,並以最少的網路流量在executors之間傳送資料。
預設情況下,Spark嘗試從靠近它的節點讀取資料到RDD。由於Spark通常訪問分散式分割槽資料,為了優化transformation(轉換)操作,它建立分割槽來儲存資料塊。
存在在HDFS或Cassandra中的分割槽資料是一一對應的(由於相同的原因進行分割槽)。
預設情況下,每個HDFS的分割槽檔案(預設分割槽檔案塊大小是64M)都會建立一個RDD分割槽。
預設情況下,不需要程式設計師干預,RDD會自動進行分割槽。但有時候你需要為你的應用程式,調整分割槽的大小,或者使用另一種分割槽方案。
你可以通過方法def getPartitions: Array[Partition]
來獲取RDD的分割槽數量。
在spark-shell中執行以下程式碼:
val v = sc.parallelize(1 to 100)
scala> v.getNumPartitions
res2: Int = 20 //RDD的分割槽數是20?why? 原因在後面講解。
一般來說分割槽數和任務數是相等的。以上程式碼可以看到分割槽是20個,再從spark管理介面上看,有20個任務。
可以通過引數指定RDD的分割槽數:
val v = sc.parallelize(1 to 100, 2)
scala> v.getNumPartitions
res2: Int = 2 //RDD的分割槽數是2
可以看出,指定了分割槽數量以後,輸出的是指定的分割槽數。通過介面上看,只有2個任務。
分割槽大小對Spark效能的影響
分割槽塊越小,分割槽數量就會越多。分割槽資料就會分佈在更多的worker節點上。但分割槽越多意味著處理分割槽的計算任務越多,太大的分割槽數量(任務數量)可能是導致Spark任務執行效率低下的原因之一。
所以,太大或太小的分割槽都有可能導致Spark任務執行效率低下。那麼,應該如何設定RDD的分割槽?
Spark只能為RDD的每個分割槽執行1個併發任務,直到達到Spark叢集的CPU數量。
所以,如果你有一個擁有50個CPU的Spark叢集,那麼你可以讓RDD至少有50個分割槽(或者是CPU數量的2到3倍)。
一個比較好的分割槽數的值至少是executors的數量。可以通過引數設定RDD的預設分割槽數,也就是我們所說的並行度: sc.defaultParallelism
上一節中,當沒有設定分割槽時,在我的Spark叢集中預設的分割槽數是20,是因為在Spark預設配置檔案:conf/spark-defaults.conf中我設定了變數: spark.default.parallelism 20
同樣,RDD的action函式產生的輸出檔案數量,也是由分割槽的數量來決定的。
分割槽數量的上限,取決於executor的可用記憶體大小。
RDD執行的第一個transformation函式的分割槽數量,決定了在該RDD上執行的後續一系列處理過程的分割槽數量。例如從hdfs讀取資料的函式: sc.textFile(path, partition)
當使用函式 rdd = SparkContext().textFile("hdfs://…/file.txt")
時,你得到的分割槽數量可能很少,這將會和HDFS的塊的多少相等。但當你的檔案中的行比較大時,得到的分割槽可能更少。
你也可以通過textFile函式的第2個引數指定讀取的分割槽數量,但該分割槽數量: sc.textFile("hdfs://host:port/path", 200)
這樣讀取path的檔案後,會生成200個分割槽。
注意:第2個引數指定的分割槽數,必須大於等於2。
注意:以上描述只是對非壓縮檔案適用,對於壓縮檔案不能在textFile中指定分割槽數,而是要進行repartition:
rdd = sc.textFile('demo.gz')
rdd = rdd.repartition(100)
一些函式,例如:map,flatMap,filter不會保留分割槽。會把每個函式應用到每一個分割槽上。
RDD的Repartition
函式的定義定義如下:
/**
* Return a new RDD that has exactly numPartitions partitions.
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.
* If you are decreasing the number of partitions in this RDD, consider using coalesce
, which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
從程式碼上可以看到,repartition是shuffle和numPartitions分割槽的合併操作。
若分割槽策略不符合你的應用場景,你可以編寫自己的Partitioner。
coalesce 轉換
改函式的程式碼如下:
/**
* Return a new RDD that is reduced into numPartitions
partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalesce(1000, shuffle = true) will result in 1000 partitions with the
* data distributed using a hash partitioner.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
… …
}
coalesce轉換用於更改分割槽數。它可以根據shuffle標誌觸發RDD shuffle(預設情況下禁用shuffle,即為false)
從以上程式碼註釋可以看出:該函式是一個合併分割槽的操作,一般該函式用來進行narrow轉換。為了讓該函式並行執行,通常把shuffle的值設定成true。
coalesce使用舉例
scala> val rdd = sc.parallelize(0 to 10, 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24
scala> rdd.partitions.size
res0: Int = 8
scala> rdd.coalesce(numPartitions=8, shuffle=false) (1)
res1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at :27
scala> res1.toDebugString
res2: String =
(8) CoalescedRDD[1] at coalesce at :27 []
| ParallelCollectionRDD[0] at parallelize at :24 []
scala> rdd.coalesce(numPartitions=8, shuffle=true)
res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at coalesce at :27
scala> res3.toDebugString
res4: String =
(8) MapPartitionsRDD[5] at coalesce at :27 []
| CoalescedRDD[4] at coalesce at :27 []
| ShuffledRDD[3] at coalesce at :27 []
+-(8) MapPartitionsRDD[2] at coalesce at :27 []
| ParallelCollectionRDD[0] at parallelize at :24 []
注意:
- 預設情況下coalesce是不會進行shuffle。
- 另外,分割槽數和源RDD的分割槽數保持一致。
分割槽相關引數
spark.default.parallelism
設定要用於HashPartitioner的分割槽數。它對應於排程程式後端的預設並行度。
它也和以下幾個數量對應:
- LocalSchedulerBackend是spark本地執行的排程器,此時,該引數的數量是,本地JVM的執行緒數。
本地模式的預設並行度的設定原始碼如下:
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
- Spark on Mesos的CPU數量,預設是8.
- 總CPU數:totalCoreCount,在CoarseGrainedSchedulerBackend 是2。
如何檢視RDD的分割槽
通過UI檢視使用分割槽的任務執行
啟動spark-shell執行以下命令:
scala> val someRDD = sc.parallelize(1 to 100, 4)
someRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :27
scala> someRDD.map(x => x).collect
17/06/20 07:37:54 INFO spark.SparkContext: Starting job: collect at console:30
… …
再通過spark管理介面檢視任務執行情況:
通過UI檢視Partition Caching
在終端的spark-shell下執行以下命令:
scala> someRDD.setName("toy").cache
scala> someRDD.map(x => x).collect
再通過spark UI檢視cache的情況:
通過函式呼叫獲取分割槽數量
- RDD.getNumPartitions
- rdd.partitions.size
原文連結:https://blog.csdn.net/zg_hover/article/details/73476265
參考文件:https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html