Spark---WC---Spark從外部讀取資料之textFile
測試資料
hello spark hello hadoop csdn hadoop csdn csdn hello world結果
(spark,1) (hadoop,2) (csdn,3) (hello,3) (world,1)
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object WC { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("WC").setMaster("local") val sc: SparkContext = new SparkContext(conf) val path = "file:///" + System.getProperty("user.dir") + "/data4test/WC.txt" val outpath="file:///" + System.getProperty("user.dir") + "/data4test/WC4result.txt" //textFile會產生兩個RDD HadoopRDD 和 MapPartitionsRDD val hadoopRDD: RDD[String] = sc.textFile(path) //產生一個RDD==>MapPartitionsRDD val mapPartitionsRDD_1: RDD[String] = hadoopRDD.flatMap(line => line.split(" ")) //產生一個RDD==>MapPartitionsRDD val mapPartitionsRDD_2: RDD[(String, Int)] = mapPartitionsRDD_1.map(word => (word, 1)) //產生一個RDD==>ShuffledRDD val shuffleRDD: RDD[(String, Int)] = mapPartitionsRDD_2.reduceByKey((a,b)=>a+b) shuffleRDD.saveAsTextFile(outpath) sc.stop() } }
SparkContext--textFile函式
原始碼過程
SparkContext.scala | HadoopRDD.scala |
textFile => hadoopFile=> | HadoopRDD |
HadoopRDD
/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
分析引數:
path: String 是一個URI,這個URI可以是HDFS、本地檔案(全部的節點都可以),或者其他Hadoop支援的檔案系統URI返回的是一個字串型別的RDD,也就是是RDD的內部形式是Iterator[(String)]
minPartitions= math.min(defaultParallelism, 2) 是指定資料的分割槽,如果不指定分割槽,
當你的核數大於2的時候,不指定分割槽數那麼就是 2
當你的資料大於128M時候,Spark是為每一個快(block)建立一個分片(Hadoop-2.X之後為128m一個block)
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
// 廣播hadoop配置檔案
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,//SparkContext
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
1、從當前目錄讀取一個檔案
val path = "Current.txt" //Current fold file
val rdd1 = sc.textFile(path,2)
從當前目錄讀取一個Current.txt的檔案
2、從當前目錄讀取多個檔案
val path = "Current1.txt,Current2.txt," //Current fold file
val rdd1 = sc.textFile(path,2)
從當前讀取兩個檔案,分別是Cuttent1.txt和Current2.txt
3、從本地系統讀取一個檔案
val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/README.md" //local file
val rdd1 = sc.textFile(path,2)
從本地系統讀取一個檔案,名字是README.md
4、從本地系統讀取整個資料夾
val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/" //local file
val rdd1 = sc.textFile(path,2)
從本地系統中讀取licenses這個資料夾下的所有檔案
這裡特別注意的是,比如這個資料夾下有35個檔案,上面分割槽數設定是2,那麼整個RDD的分割槽數是35*2?
這是錯誤的,這個RDD的分割槽數不管你的partition數設定為多少時,只要license這個資料夾下的這個檔案a.txt
(比如有a.txt)沒有超過128m,那麼a.txt就只有一個partition。那麼就是說只要這35個檔案其中沒有一個超過
128m,那麼分割槽數就是 35個
5、從本地系統讀取多個檔案
val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/LICENSE-scala.txt,file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/LICENSE-spire.txt" //local file
val rdd1 = sc.textFile(path,2)
從本地系統中讀取file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/下的LICENSE-spire.txt和LICENSE-scala.txt兩個檔案。上面分割槽設定是2,那個RDD的整個分割槽數是2*2
6、從本地系統讀取多個資料夾下的檔案(把如下檔案全部讀取進來)
val path = "/usr/local/spark/spark-1.6.0-bin-hadoop2.6/data/*/*" //local file
val rdd1 = sc.textFile(path,2)
採用萬用字元的形式來代替檔案,來對資料資料夾進行整體讀取。但是後面設定的分割槽數2也是可以去除的。因為一個檔案沒有達到128m,所以上面的一個檔案一個partition,一共是20個。
7、採用萬用字元,來讀取多個檔名類似的檔案
比如讀取如下檔案的people1.txt和people2.txt,但google.txt不讀取
for (i <- 1 to 2){
val rdd1 = sc.textFile(s"/root/application/temp/people$i*",2)
}
8、採用萬用字元讀取相同字尾的檔案
val path = "/usr/local/spark/spark-1.6.0-bin-hadoop2.6/data/*/*.txt" //local file
val rdd1 = sc.textFile(path,2)
9、從HDFS讀取一個檔案
val path = "hdfs://master:9000/examples/examples/src/main/resources/people.txt"
val rdd1 = sc.textFile(path,2)
從HDFS中讀取檔案的形式和本地上一樣,只是前面的路徑要表明是HDFS中的