1. 程式人生 > >Spark-2.0分割槽原理詳解

Spark-2.0分割槽原理詳解

Spark分割槽原理分析

介紹

分割槽是指如何把RDD分佈在spark叢集的各個節點的操作。以及一個RDD能夠分多少個分割槽。

一個分割槽是大型分散式資料集的邏輯塊。

那麼思考一下:分割槽數如何對映到spark的任務數?如何驗證?分割槽和任務如何對應到本地的資料?

Spark使用分割槽來管理資料,這些分割槽有助於並行化分散式資料處理,並以最少的網路流量在executors之間傳送資料。

預設情況下,Spark嘗試從靠近它的節點讀取資料到RDD。由於Spark通常訪問分散式分割槽資料,為了優化transformation(轉換)操作,它建立分割槽來儲存資料塊。

存在在HDFS或Cassandra中的分割槽資料是一一對應的(由於相同的原因進行分割槽)。

預設情況下,每個HDFS的分割槽檔案(預設分割槽檔案塊大小是64M)都會建立一個RDD分割槽。

預設情況下,不需要程式設計師干預,RDD會自動進行分割槽。但有時候你需要為你的應用程式,調整分割槽的大小,或者使用另一種分割槽方案。

你可以通過方法def getPartitions: Array[Partition]來獲取RDD的分割槽數量。

在spark-shell中執行以下程式碼: 

val v = sc.parallelize(1 to 100) 
scala> v.getNumPartitions 
res2: Int = 20 //RDD的分割槽數是20?why? 原因在後面講解。 

一般來說分割槽數和任務數是相等的。以上程式碼可以看到分割槽是20個,再從spark管理介面上看,有20個任務。

可以通過引數指定RDD的分割槽數: 

val v = sc.parallelize(1 to 100, 2) 
scala> v.getNumPartitions 
res2: Int = 2 //RDD的分割槽數是2 

可以看出,指定了分割槽數量以後,輸出的是指定的分割槽數。通過介面上看,只有2個任務。

分割槽大小對Spark效能的影響

分割槽塊越小,分割槽數量就會越多。分割槽資料就會分佈在更多的worker節點上。但分割槽越多意味著處理分割槽的計算任務越多,太大的分割槽數量(任務數量)可能是導致Spark任務執行效率低下的原因之一。

所以,太大或太小的分割槽都有可能導致Spark任務執行效率低下。那麼,應該如何設定RDD的分割槽?

Spark只能為RDD的每個分割槽執行1個併發任務,直到達到Spark叢集的CPU數量。 
所以,如果你有一個擁有50個CPU的Spark叢集,那麼你可以讓RDD至少有50個分割槽(或者是CPU數量的2到3倍)。

一個比較好的分割槽數的值至少是executors的數量。可以通過引數設定RDD的預設分割槽數,也就是我們所說的並行度: 
sc.defaultParallelism 
上一節中,當沒有設定分割槽時,在我的Spark叢集中預設的分割槽數是20,是因為在Spark預設配置檔案:conf/spark-defaults.conf中我設定了變數: 
spark.default.parallelism 20

同樣,RDD的action函式產生的輸出檔案數量,也是由分割槽的數量來決定的。

分割槽數量的上限,取決於executor的可用記憶體大小。

RDD執行的第一個transformation函式的分割槽數量,決定了在該RDD上執行的後續一系列處理過程的分割槽數量。例如從hdfs讀取資料的函式: 
sc.textFile(path, partition)

當使用函式 rdd = SparkContext().textFile("hdfs://…​/file.txt")時,你得到的分割槽數量可能很少,這將會和HDFS的塊的多少相等。但當你的檔案中的行比較大時,得到的分割槽可能更少。

你也可以通過textFile函式的第2個引數指定讀取的分割槽數量,但該分割槽數量: 
sc.textFile("hdfs://host:port/path", 200) 
這樣讀取path的檔案後,會生成200個分割槽。 
注意:第2個引數指定的分割槽數,必須大於等於2。

注意:以上描述只是對非壓縮檔案適用,對於壓縮檔案不能在textFile中指定分割槽數,而是要進行repartition: 

rdd = sc.textFile('demo.gz') 
rdd = rdd.repartition(100) 

一些函式,例如:map,flatMap,filter不會保留分割槽。會把每個函式應用到每一個分割槽上。

RDD的Repartition

函式的定義定義如下: 

/** 
* Return a new RDD that has exactly numPartitions partitions. 
* Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. 
* If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle. 
*/ 
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 
coalesce(numPartitions, shuffle = true) 


從程式碼上可以看到,repartition是shuffle和numPartitions分割槽的合併操作。

若分割槽策略不符合你的應用場景,你可以編寫自己的Partitioner。

coalesce 轉換

改函式的程式碼如下: 

/** 
* Return a new RDD that is reduced into numPartitions partitions. 

* This results in a narrow dependency, e.g. if you go from 1000 partitions 
* to 100 partitions, there will not be a shuffle, instead each of the 100 
* new partitions will claim 10 of the current partitions. 

* However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, 
* this may result in your computation taking place on fewer nodes than 
* you like (e.g. one node in the case of numPartitions = 1). To avoid this, 
* you can pass shuffle = true. This will add a shuffle step, but means the 
* current upstream partitions will be executed in parallel (per whatever 
* the current partitioning is). 

* Note: With shuffle = true, you can actually coalesce to a larger number 
* of partitions. This is useful if you have a small number of partitions, 
* say 100, potentially with a few partitions being abnormally large. Calling 
* coalesce(1000, shuffle = true) will result in 1000 partitions with the 
* data distributed using a hash partitioner. 
*/ 
def coalesce(numPartitions: Int, shuffle: Boolean = false, 
partitionCoalescer: Option[PartitionCoalescer] = Option.empty) 
(implicit ord: Ordering[T] = null) 
: RDD[T] = withScope { 
… … 

coalesce轉換用於更改分割槽數。它可以根據shuffle標誌觸發RDD shuffle(預設情況下禁用shuffle,即為false)

從以上程式碼註釋可以看出:該函式是一個合併分割槽的操作,一般該函式用來進行narrow轉換。為了讓該函式並行執行,通常把shuffle的值設定成true。

coalesce使用舉例


scala> val rdd = sc.parallelize(0 to 10, 8) 
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24

scala> rdd.partitions.size 
res0: Int = 8

scala> rdd.coalesce(numPartitions=8, shuffle=false) (1) 
res1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[1] at coalesce at :27

scala> res1.toDebugString 
res2: String = 
(8) CoalescedRDD[1] at coalesce at :27 [] 
| ParallelCollectionRDD[0] at parallelize at :24 []

scala> rdd.coalesce(numPartitions=8, shuffle=true) 
res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at coalesce at :27

scala> res3.toDebugString 
res4: String = 
(8) MapPartitionsRDD[5] at coalesce at :27 [] 
| CoalescedRDD[4] at coalesce at :27 [] 
| ShuffledRDD[3] at coalesce at :27 [] 
+-(8) MapPartitionsRDD[2] at coalesce at :27 [] 
| ParallelCollectionRDD[0] at parallelize at :24 [] 

注意:

  • 預設情況下coalesce是不會進行shuffle。
  • 另外,分割槽數和源RDD的分割槽數保持一致。

分割槽相關引數

spark.default.parallelism

設定要用於HashPartitioner的分割槽數。它對應於排程程式後端的預設並行度。 
它也和以下幾個數量對應:

  • LocalSchedulerBackend是spark本地執行的排程器,此時,該引數的數量是,本地JVM的執行緒數。

本地模式的預設並行度的設定原始碼如下: 

case LOCAL_N_REGEX(threads) => 
def localCpuCount: Int = Runtime.getRuntime.availableProcessors() 
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. 
val threadCount = if (threads == "*") localCpuCount else threads.toInt 

  • Spark on Mesos的CPU數量,預設是8.
  • 總CPU數:totalCoreCount,在CoarseGrainedSchedulerBackend 是2。

如何檢視RDD的分割槽

通過UI檢視使用分割槽的任務執行

啟動spark-shell執行以下命令: 

scala> val someRDD = sc.parallelize(1 to 100, 4) 
someRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :27

scala> someRDD.map(x => x).collect 
17/06/20 07:37:54 INFO spark.SparkContext: Starting job: collect at console:30 
… … 

再通過spark管理介面檢視任務執行情況: 


通過UI檢視Partition Caching

在終端的spark-shell下執行以下命令: 

scala> someRDD.setName("toy").cache 
scala> someRDD.map(x => x).collect 

再通過spark UI檢視cache的情況: 

通過函式呼叫獲取分割槽數量

  • RDD.getNumPartitions
  • rdd.partitions.size

原文連結:https://blog.csdn.net/zg_hover/article/details/73476265

參考文件:https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html