spark 讀取 hdfs 資料分割槽規則
阿新 • • 發佈:2018-12-13
下文以讀取 parquet 檔案 / parquet hive table 為例:
hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertMetastoreParquet
控制,預設為 true。
如果設定為 true ,會使用 org.apache.spark.sql.execution.FileSourceScanExec
,否則會使用 org.apache.spark.sql.hive.execution.HiveTableScanExec
。
FileSourceScanExec
前者對分割槽規則做了一些優化,如果 檔案是:
-
沒有分桶的情況
分割槽大小計算公式:
bytesPerCore = totalBytes / defaultParallelism maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
- defaultMaxSplitBytes:spark.sql.files.maxPartitionBytes,預設為128M,每個分割槽讀取的最大資料量
- openCostInBytes: spark.sql.files.openCostInBytes,預設為4M,小於這個大小的檔案將會合併到一個分割槽,可以理解為每個分割槽的最小量,避免碎檔案造成的大量碎片任務。
- defaultParallelism: spark.default.parallelism,yarn預設為應用cores數量或2。
- bytesPerCore:資料總大小 / defaultParallelism
eg. 讀入一份 2048M 大小的資料
Tip: partitionSize的計算過程簡化,實際上會先對讀入的每個分割槽按maxSplitBytes做切割,切割完後如果的小檔案如果大小不足maxSplitBytes的,會合併到一個partition,直到大小 > maxSplitBytes。
//如果 spark.default.parallelism 設定為 1000,最終的分割槽數量是 512,每個分割槽大小為4M
-
分桶的情況下:
分割槽數取決於桶的數量。
HiveTableScanExec
通過檔案數量,大小進行分割槽。
eg. 讀入一份 2048M 大小的資料,hdfs 塊大小設定為 128M
- 該目錄有1000個小檔案,則會生成1000個partition。
- 如果只有1個檔案,則會生成 16 個分割槽。
- 如果有一個大檔案1024M,其餘 999 個檔案共 1024M,則會生成 1009 個分割槽