1. 程式人生 > >Spark 讀取外部檔案的幾種方式

Spark 讀取外部檔案的幾種方式

textFile函式

  1. /**
  2. * Read a text file from HDFS, a local file system (available on all nodes), or any
  3. * Hadoop-supported file system URI, and return it as an RDD of Strings.
  4. */
  5. def textFile(
  6. path: String,
  7. minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  8. assertNotStopped()
  9. hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
  10. minPartitions).map(pair => pair._2.toString).setName(path)
  11. }
分析引數:

path: String 是一個URI,這個URI可以是HDFS、本地檔案(全部的節點都可以),或者其他Hadoop支援的檔案系統URI返回的是一個字串型別的RDD,也就是是RDD的內部形式是Iterator[(String)]

minPartitions=  math.min(defaultParallelism, 2) 

是指定資料的分割槽,如果不指定分割槽,當你的核數大於2的時候,不指定分割槽數那麼就是 2

當你的資料大於128M時候,Spark是為每一個快(block)建立一個分片(Hadoop-2.X之後為128m一個block)

1、從當前目錄讀取一個檔案

  1. val path = "Current.txt" //Current fold file
  2. val rdd1 = sc.textFile(path,2)

從當前目錄讀取一個Current.txt的檔案

2、從當前目錄讀取多個檔案

  1. val path = "Current1.txt,Current2.txt," //Current fold file
  2. val rdd1 = sc.textFile(path,2)
從當前讀取兩個檔案,分別是Cuttent1.txt和Current2.txt

3、從本地系統讀取一個檔案

  1. val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/README.md" //local file
  2. val rdd1 = sc.textFile(path,2)
從本地系統讀取一個檔案,名字是README.md

4、從本地系統讀取整個資料夾

  1. val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/" //local file
  2. val rdd1 = sc.textFile(path,2)
從本地系統中讀取licenses這個資料夾下的所有檔案

這裡特別注意的是,比如這個資料夾下有35個檔案,上面分割槽數設定是2,那麼整個RDD的分割槽數是35*2?

這是錯誤的,這個RDD的分割槽數不管你的partition數設定為多少時,只要license這個資料夾下的這個檔案a.txt

(比如有a.txt)沒有超過128m,那麼a.txt就只有一個partition。那麼就是說只要這35個檔案其中沒有一個超過

128m,那麼分割槽數就是 35個

5、從本地系統讀取多個檔案

  1. val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/LICENSE-scala.txt,file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/LICENSE-spire.txt" //local file
  2. val rdd1 = sc.textFile(path,2)
從本地系統中讀取file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/下的LICENSE-spire.txt和

LICENSE-scala.txt兩個檔案。上面分割槽設定是2,那個RDD的整個分割槽數是2*2

6、從本地系統讀取多個資料夾下的檔案(把如下檔案全部讀取進來)



  1. val path = "/usr/local/spark/spark-1.6.0-bin-hadoop2.6/data/*/*" //local file
  2. val rdd1 = sc.textFile(path,2)

採用萬用字元的形式來代替檔案,來對資料資料夾進行整體讀取。但是後面設定的分割槽數2也是可以去除的。因為一個檔案沒有達到128m,所以上面的一個檔案一個partition,一共是20個。

7、採用萬用字元,來讀取多個檔名類似的檔案

比如讀取如下檔案的people1.txt和people2.txt,但google.txt不讀取


  1. for (i <- 1 to 2){
  2. val rdd1 = sc.textFile(s"/root/application/temp/people$i*",2)
  3. }
8、採用萬用字元讀取相同字尾的檔案
  1. val path = "/usr/local/spark/spark-1.6.0-bin-hadoop2.6/data/*/*.txt" //local file
  2. val rdd1 = sc.textFile(path,2)


9、從HDFS讀取一個檔案
  1. val path = "hdfs://master:9000/examples/examples/src/main/resources/people.txt"
  2. val rdd1 = sc.textFile(path,2)
從HDFS中讀取檔案的形式和本地上一樣,只是前面的路徑要表明是HDFS中的






        </div>
            </div>
        </article>