1. 程式人生 > >Spark---WC---Spark從外部讀取資料之textFile

Spark---WC---Spark從外部讀取資料之textFile

測試資料

hello spark
hello hadoop
csdn hadoop
csdn csdn
hello world

結果

(spark,1)
(hadoop,2)
(csdn,3)
(hello,3)
(world,1)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WC {

    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("WC").setMaster("local")
        val sc: SparkContext = new SparkContext(conf)
        val path = "file:///" + System.getProperty("user.dir") + "/data4test/WC.txt"

        val outpath="file:///" + System.getProperty("user.dir") + "/data4test/WC4result.txt"

        //textFile會產生兩個RDD  HadoopRDD 和 MapPartitionsRDD
        val hadoopRDD: RDD[String] = sc.textFile(path)
        //產生一個RDD==>MapPartitionsRDD
        val mapPartitionsRDD_1: RDD[String] = hadoopRDD.flatMap(line => line.split(" "))
        //產生一個RDD==>MapPartitionsRDD
        val mapPartitionsRDD_2: RDD[(String, Int)] = mapPartitionsRDD_1.map(word => (word, 1))
        //產生一個RDD==>ShuffledRDD
        val shuffleRDD: RDD[(String, Int)] = mapPartitionsRDD_2.reduceByKey((a,b)=>a+b)
        shuffleRDD.saveAsTextFile(outpath)
        sc.stop()
    }
}

 

 

SparkContext--textFile函式

原始碼過程         

SparkContext.scala HadoopRDD.scala
 textFile => hadoopFile=> HadoopRDD

HadoopRDD  

/** 
   * Read a text file from HDFS, a local file system (available on all nodes), or any 
   * Hadoop-supported file system URI, and return it as an RDD of Strings. 
   */  
  def textFile(  
      path: String,  
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {  
    assertNotStopped()  
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],  
      minPartitions).map(pair => pair._2.toString).setName(path)  
  }

分析引數:

path: String 是一個URI,這個URI可以是HDFS、本地檔案(全部的節點都可以),或者其他Hadoop支援的檔案系統URI返回的是一個字串型別的RDD,也就是是RDD的內部形式是Iterator[(String)]

minPartitions=  math.min(defaultParallelism, 2) 是指定資料的分割槽,如果不指定分割槽,

                                         當你的核數大於2的時候,不指定分割槽數那麼就是 2

當你的資料大於128M時候,Spark是為每一個快(block)建立一個分片(Hadoop-2.X之後為128m一個block)

def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()
    //   A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    //   廣播hadoop配置檔案
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,//SparkContext
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

1、從當前目錄讀取一個檔案

val path = "Current.txt"  //Current fold file
val rdd1 = sc.textFile(path,2)

從當前目錄讀取一個Current.txt的檔案

2、從當前目錄讀取多個檔案

val path = "Current1.txt,Current2.txt,"  //Current fold file
val rdd1 = sc.textFile(path,2)

從當前讀取兩個檔案,分別是Cuttent1.txt和Current2.txt

3、從本地系統讀取一個檔案

val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/README.md"  //local file
val rdd1 = sc.textFile(path,2)

從本地系統讀取一個檔案,名字是README.md

4、從本地系統讀取整個資料夾

val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/"  //local file
val rdd1 = sc.textFile(path,2)

從本地系統中讀取licenses這個資料夾下的所有檔案

這裡特別注意的是,比如這個資料夾下有35個檔案,上面分割槽數設定是2,那麼整個RDD的分割槽數是35*2?

這是錯誤的,這個RDD的分割槽數不管你的partition數設定為多少時,只要license這個資料夾下的這個檔案a.txt

(比如有a.txt)沒有超過128m,那麼a.txt就只有一個partition。那麼就是說只要這35個檔案其中沒有一個超過

128m,那麼分割槽數就是 35個

5、從本地系統讀取多個檔案

val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/LICENSE-scala.txt,file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/LICENSE-spire.txt"  //local file
val rdd1 = sc.textFile(path,2)

從本地系統中讀取file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/下的LICENSE-spire.txt和LICENSE-scala.txt兩個檔案。上面分割槽設定是2,那個RDD的整個分割槽數是2*2

6、從本地系統讀取多個資料夾下的檔案(把如下檔案全部讀取進來)

val path = "/usr/local/spark/spark-1.6.0-bin-hadoop2.6/data/*/*"  //local file
val rdd1 = sc.textFile(path,2)

 採用萬用字元的形式來代替檔案,來對資料資料夾進行整體讀取。但是後面設定的分割槽數2也是可以去除的。因為一個檔案沒有達到128m,所以上面的一個檔案一個partition,一共是20個。

7、採用萬用字元,來讀取多個檔名類似的檔案

比如讀取如下檔案的people1.txt和people2.txt,但google.txt不讀取

for (i <- 1 to 2){
      val rdd1 = sc.textFile(s"/root/application/temp/people$i*",2)
    }

8、採用萬用字元讀取相同字尾的檔案

val path = "/usr/local/spark/spark-1.6.0-bin-hadoop2.6/data/*/*.txt"  //local file
val rdd1 = sc.textFile(path,2)

9、從HDFS讀取一個檔案

val path = "hdfs://master:9000/examples/examples/src/main/resources/people.txt"
val rdd1 = sc.textFile(path,2)

從HDFS中讀取檔案的形式和本地上一樣,只是前面的路徑要表明是HDFS中的