1. 程式人生 > >【spark】儲存資料到hdfs,自動判斷合理分塊數量(repartition和coalesce)(一)

【spark】儲存資料到hdfs,自動判斷合理分塊數量(repartition和coalesce)(一)

本人菜鳥一隻,也處於學習階段,如果有什麼說錯的地方還請大家批評指出!

首先我想說明下該文章是幹嘛的,該文章粗略介紹了hdfs儲存資料檔案塊策略和spark的repartition、coalesce兩個運算元的區別,是為了下一篇文章的自動判斷合理分塊數做知識的鋪墊,如果對於這部分知識已經瞭解,甚至精通的同學,可以直接跳到該系列的第二篇文章!

背景:

spark讀取Hive表或者HDFS甚至各種框架的資料,生成了Rdd或者Dataset(或者DataFrame,但是在spark2以後的版本DataFrame這個物件已經被取消了,全部用Dataset替代),經過一大堆邏輯處理之後,將資料又存回了HDFS。但是這時候就會遇到一個問題,就是這份資料在HDFS上會被分成幾份,個數是否合理?

原因:

為什麼要考慮這個問題?是因為HDFS不適合存太多小檔案,因為每個檔案塊都會有一段地址存在namenode的記憶體中,如果太多小檔案或者目錄太深,會大大降低HDFS儲存的效能,而且在某些極端的情況下,spark甚至會儲存很多空檔案,這些空檔案還會影響之後的計算(因為需要啟動一個map來讀取這個空檔案,消耗了資源和時間,卻在做無用功)

例如:

如圖,我們想要的是,每個檔案塊的大小都在幾十M甚至200M,300M之間(不需要十分的精確)。

但是每個檔案也不能太大,為什麼?

壓縮格式     工具    
演算法     副檔名     是否可切分
DEFLATE     無     DEFLATE     .deflate    
Gzip     gzip     DEFLATE    
.gz    
bzip2     bzip2     bzip2     .bz2    
LZO     lzop     LZO     .lzo    
LZ4     無     LZ4     .lz4    
Snappy     無     Snappy     .snappy    

該表格來自於:(作者:瓜牛呱呱)https://blog.csdn.net/lin_wj1995/article/details/78967486

解釋下:hadoop的MR(MapReduce)和spark的引擎在讀取資料的時候,一個block可能會使用多個執行緒一起讀。

例如:

執行緒1讀取偏移量:0+10000的資料

執行緒2讀取偏移量:10001+20000的資料

.....

執行緒7讀取偏移量:60001+62043的資料

如圖(原諒我資料不夠大,只有一個執行緒在讀,spark預設是128M啟動一個執行緒讀,也就是說如果檔案有300M,那麼spark會啟動3個核心來同時讀取這個資料):

以此類推,來加快資料讀取的速度,但是問題來了,如果HDFS上的資料做了壓縮,那就只有bzip2的壓縮格式才可以多個執行緒加速讀的,所以如果一個檔案塊非常大的話,只能用一個執行緒來讀這個非常大的檔案,讀取資料的時間就會非常的長,延長任務執行時間。因此,相對合理的方式就是每個檔案塊大小在128M之間,這樣不管是否壓縮,讀取資料的時候都不會受到太大的影響。

前言:

來看2個API(repartition和coalesce)和兩個問題

1、repartition和coalesce的區別

-1.先說shuffle:

spark有個階段叫shuffle,他的shuffle和MR的shuffle不一樣,但是都會經歷重分割槽的過程,如何判斷什麼時候有shuffle階段呢?看看程式碼中是否有這些過程就好了,例如:去重,join,group by ,各種聚合函式(reduceByKey等),排序。沒有shuffle的spark程式碼,就相當於只對資料做簡單的過濾,對應到MR上,就是隻有map邏輯,沒有reduce邏輯,所以程式碼中shuffle越多,消耗的資源也越多,速度也會越慢,因為spark的shuffle過程和MR的reduce之前的shuffle過程一樣,資料需要在不同的節點之間傳遞。

-2.repartition和coalesce的區別:

repartition會觸發shuffle,coalesce不會,所以repartition效能比coalesce差!

看spark原始碼:

  def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
    Repartition(numPartitions, shuffle = true, logicalPlan)
  }


  def coalesce(numPartitions: Int): Dataset[T] = withTypedPlan {
    Repartition(numPartitions, shuffle = false, logicalPlan)
  }

-3.原理:repartition將所有資料重新分割槽,coalesce是單純將不同分割槽的資料直接合併到一個分割槽裡。直觀來看,repartition之後,每個檔案塊大小會基本一樣,coalesce之後,每個檔案塊大小一般都是不一樣的,甚至會差很多。

-4.總結:repartition一般是用來增加分割槽數(當然也可以減少),coalesce只能用來減少分割槽數。所以如果不介意儲存的檔案塊大小不一樣,可以使用coalesce來減少分割槽數,儲存的時候一個分割槽就會生成一個檔案塊。

 

2、spark檔案塊什麼時候增加的,增加有什麼用?

接下來的文字描述,是針對於sparksql(也就是把資料載入成Dataset之後再處理)來說的。

-1.增加分割槽數,可以增加並行度,當spark申請的cpu核心足夠的情況下,可以同時跑不同分割槽的資料(因為一個分割槽的資料,只能由一個核心來跑,不能多個)

-2.手動增加,使用repartition來將所有資料打散

-3.自動增加,spark有個引數:spark.sql.shuffle.partitions,預設值為200。也就是說當觸發shuffle邏輯的時候,資料會自動分為200個分割槽執行,但是在資料量大的情況下,每個分割槽的資料量太大,而且假設spark申請到了300個核心,但是因為分割槽數只有200,會導致只有200個核心在執行,另外100個核心在空轉(雖然佔用資源但是卻不幹活)。所以可以將該引數設定為500甚至更大,來增加分割槽數和並行度。

3、spark檔案塊在儲存前如何減少?

在上一個(計算的)步驟,我們將資料增加分割槽,一個分割槽會生成一個檔案塊,如果沒有做任何修改,並且spark.sql.shuffle.partitions引數值設定為200,那麼不管這個資料多大,都會生成200個檔案塊。所以減少檔案塊的方法就是通過在資料儲存之前呼叫repartition或者coalesce這兩個API來減少或者增加檔案塊。

例如:

Dataset<Row> tb = spark.table("資料庫.表名")
       .groupBy(col("日期"))
       .agg(countDistinct(col("id")).as("uv"),count(lit(1)).as("pv"))
       .select("日期","uv","pv");
 
//這裡也可以使用coalesce來代替repartition
tb.repartition(1)
.write()
.partitionBy("日期")
.mode(SaveMode.Overwrite)
.format("hive").saveAsTable("資料庫.新的表名");

講到這裡,相信大家對於hdfs儲存資料檔案塊策略和spark的repartition、coalesce兩個運算元有了初步的瞭解,下一篇文章會進入正題:有哪些方式可以自動且合理的判斷儲存的檔案個數?

好了,本人菜雞一個,如果有什麼說錯或者說的不嚴謹的地方,還請大家批評指出~!

未完待續~