1. 程式人生 > >Spark RDD的預設分割槽數:(spark 2.1.0)

Spark RDD的預設分割槽數:(spark 2.1.0)

本文基於Spark 2.1.0版本

新手首先要明白幾個配置:

spark.default.parallelism:(預設的併發數)

    如果配置檔案spark-default.conf中沒有顯示的配置,則按照如下規則取值:

    本地模式(不會啟動executor,由SparkSubmit程序生成指定數量的執行緒數來併發):

    spark-shell                              spark.default.parallelism = 1

    spark-shell --master local[N] spark.default.parallelism = N (使用N個核)

    spark-shell --master local      spark.default.parallelism = 1

    偽叢集模式(x為本機上啟動的executor數,y為每個executor使用的core數,

z為每個 executor使用的記憶體)

     spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y

     mesos 細粒度模式

     Mesos fine grained mode  spark.default.parallelism = 8

    其他模式(這裡主要指yarn模式,當然standalone也是如此)

    Others: total number of cores on all executor nodes or 2, whichever is larger

    spark.default.parallelism =  max(所有executor使用的core總數, 2)

經過上面的規則,就能確定了spark.default.parallelism的預設值(前提是配置檔案spark-default.conf中沒有顯示的配置,如果配置了,則spark.default.parallelism = 配置的值)

還有一個配置比較重要,spark.files.maxPartitionBytes = 128 M(預設)

The maximum number of bytes to pack into a single partition when reading files.

代表著rdd的一個分割槽能存放資料的最大位元組數,如果一個400m的檔案,只分了兩個區,則在action時會發生錯誤。

當一個spark應用程式執行時,生成spark.context,同時會生成兩個引數,由上面得到的spark.default.parallelism推匯出這兩個引數的值

sc.defaultParallelism     = spark.default.parallelism

sc.defaultMinPartitions = min(spark.default.parallelism,2)

當sc.defaultParallelism和sc.defaultMinPartitions最終確認後,就可以推算rdd的分割槽數了。

有兩種產生rdd的方式:

1,通過scala 集合方式parallelize生成rdd,

如, val rdd = sc.parallelize(1 to 10)

這種方式下,如果在parallelize操作時沒有指定分割槽數,則

rdd的分割槽數 = sc.defaultParallelism

2,通過textFile方式生成的rdd,

如, val rdd = sc.textFile(“path/file”)

有兩種情況:

a,從本地檔案file:///生成的rdd,操作時如果沒有指定分割槽數,則預設分割槽數規則為:

(按照官網的描述,本地file的分片規則,應該按照hdfs的block大小劃分,但實測的結果是固定按照32M來分片,可能是bug,不過不影響使用,因為spark能用所有hadoop介面支援的儲存系統,所以spark textFile使用hadoop介面訪問本地檔案時和訪問hdfs還是有區別的)

rdd的分割槽數 = max(本地file的分片數, sc.defaultMinPartitions)

b,從hdfs分散式檔案系統hdfs://生成的rdd,操作時如果沒有指定分割槽數,則預設分割槽數規則為:

rdd的分割槽數 = max(hdfs檔案的block數目, sc.defaultMinPartitions)

補充:

1,如果使用如下方式,從HBase的資料錶轉換為RDD,則該RDD的分割槽數為該Table的region數。

String tableName ="pic_test2";

conf.set(TableInputFormat.INPUT_TABLE,tableName);

conf.set(TableInputFormat.SCAN,convertScanToString(scan));

JavaPairRDD hBaseRDD = sc.newAPIHadoopRDD(conf,

TableInputFormat.class,ImmutableBytesWritable.class,

Result.class);

Hbase Table:pic_test2的region為10,則hBaseRDD的分割槽數也為10。

2,如果使用如下方式,通過獲取json(或者parquet等等)檔案轉換為DataFrame,則該DataFrame的分割槽數和該檔案在檔案系統中存放的Block數量對應。

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

people.json大小為300M,在HDFS中佔用了2個blocks,則該DataFrame df分割槽數為2。

3,Spark Streaming獲取Kafka訊息對應的分割槽數,不在本文討論。



作者:俺是亮哥
連結:https://www.jianshu.com/p/4b7d07e754fa
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。