1. 程式人生 > >spark 讀取 hdfs 資料分割槽規則

spark 讀取 hdfs 資料分割槽規則

下文以讀取 parquet 檔案 / parquet hive table 為例:

hive metastore 和 parquet 轉化的方式通過 spark.sql.hive.convertMetastoreParquet 控制,預設為 true。

如果設定為 true ,會使用 org.apache.spark.sql.execution.FileSourceScanExec ,否則會使用 org.apache.spark.sql.hive.execution.HiveTableScanExec

FileSourceScanExec

前者對分割槽規則做了一些優化,如果 檔案是:

  1. 沒有分桶的情況

    分割槽大小計算公式:

    bytesPerCore = totalBytes / defaultParallelism
    maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
    
    • defaultMaxSplitBytes:spark.sql.files.maxPartitionBytes,預設為128M,每個分割槽讀取的最大資料量
    • openCostInBytes: spark.sql.files.openCostInBytes,預設為4M,小於這個大小的檔案將會合併到一個分割槽,可以理解為每個分割槽的最小量,避免碎檔案造成的大量碎片任務。
    • defaultParallelism: spark.default.parallelism,yarn預設為應用cores數量或2。
    • bytesPerCore:資料總大小 / defaultParallelism

    eg. 讀入一份 2048M 大小的資料

    Tip: partitionSize的計算過程簡化,實際上會先對讀入的每個分割槽按maxSplitBytes做切割,切割完後如果的小檔案如果大小不足maxSplitBytes的,會合併到一個partition,直到大小 > maxSplitBytes。

    //如果 spark.default.parallelism 設定為 1000,最終的分割槽數量是 512,每個分割槽大小為4M
    maxSplitBytes = Math.min(128M, Math.max(4M, 2M)) partitionSize = 2048 / 4 = 512 //如果 spark.default.parallelism 設定為 100, 最終的分割槽數量是 100,每個分割槽大小為20.48M maxSplitBytes = Math.min(128M, Math.max(4M, 20.48M)) partitionSize = 2048 / 20.48 = 100 //如果 spark.default.parallelism 設定為 10, 最終的分割槽數量是 16,每個分割槽大小為128M maxSplitBytes = Math.min(128M, Math.max(4M, 204.8M)) partitionSize = 2048 / 128 = 16
  2. 分桶的情況下:

    分割槽數取決於桶的數量。

HiveTableScanExec

通過檔案數量,大小進行分割槽。

eg. 讀入一份 2048M 大小的資料,hdfs 塊大小設定為 128M

  1. 該目錄有1000個小檔案,則會生成1000個partition。
  2. 如果只有1個檔案,則會生成 16 個分割槽。
  3. 如果有一個大檔案1024M,其餘 999 個檔案共 1024M,則會生成 1009 個分割槽