1. 程式人生 > >[Spark]-RDD之創建

[Spark]-RDD之創建

AD 簡單 spa 訪問 重要 ron 例如 記錄 table

1.RDD的創建
  1.1 從一個本地的Scala集合創建  

  //聲明一個本地集合
  val data = Array(1, 2, 3, 4, 5)

  val distData = sc.parallelize(data)

  /**
  *分布式數據集,有一個重要參數就是數據分片數量(Spark會在每一個分片跑一個task)
  *本地集合創建,默認情況,Spark會根據你的集群數量自動設置分片數
  *也可以手動指定這個數據集的分片(第二個參數)
  */
  //val distData = sc.parallelize(data, 10)

  //一旦分布式數據集創建完畢,這個數據集就可以並行的被操作
  distData.reduce((a, b) => a + b)

  1.2 從一個外部的存儲系統中創建

    這裏外部系統,指的是任何Hadoop(InputFormat)支持的存儲系統.比如本地文本文件,HDFS,HBase,S3等等

    1.2.1 textFile       

    val distFile = sc.textFile("hdfs://hadoop000:9000/xxx/data.txt")
            
      /**
      *這裏純粹的本地文件是不推薦的
      *因為這個文件訪問是針對每一個Worker都要是能訪問的
      *  換言之,如果是本地文件,則必須保證每一個Worker的本地都有一份這個文件
      
*/ //val distFile = sc.textFile("/data.txt") /** *Spark支持文件目錄,壓縮文件,或者通配符等 */ //val distFile = sc.textFile("hdfs://hadoop000:9000/xxx/*.gz") /** *對於外部文件,Spark會按照128M(HDFS默認),來進行分區 *這裏依然可以手動設置分區數.但要註意的是手動設置的分區數必須要大於默認分區數 * 即只允許分的更小,但不能分得更大
*/ //val distFile = sc.textFile("hdfs://hadoop000:9000/xxx/*.txt",10) distFile.map(s => s.length).reduce((a, b) => a + b)

    1.2.2 wholeTextFiles

      wholeTextFiles是用來讀取某個文件目錄下的多個小文件的.

      與textFile的區別是,

        textFile 以行斷符為分割.一個記錄就是一行

        wholeTextFiles 是以文件為分割,一個記錄就是一個文件內的全部內容

      wholeTextFiles的默認情況,可能導致分區數太小.這時可以手動設置調高分區數

    1.2.3 sequenceFile[K, V]

      將數據集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系統、HDFS或其它Hadoop支持的文件系統中。

      該操作只支持對實現了Hadoop的Writable接口的鍵值對RDD進行操作。

      在Scala中,還支持隱式轉換為Writable的類型(Spark包括了基本類型的轉換,例如Int、Double、String等等)

    1.2.4 hadoopRDD

     對於其它的Hadoop InputFormats,可以hadoopRDD讀取.

     傳入JobConf,input format class,key class and value class(與MapReduce任務設置相同),就可以直接以MapReduce作為輸入源進行讀取

     newAPIHadoopRDD

    1.2.5 saveAsObjectFile

     將數據集中的元素以簡單的Java序列化的格式寫入指定的路徑。這些保存該數據的文件,可以使用SparkContext.objectFile()進行加載

[Spark]-RDD之創建