1. 程式人生 > >Spark讀取文字檔案並轉換為DataFrame

Spark讀取文字檔案並轉換為DataFrame

本文首發於我的個人部落格QIMING.INFO,轉載請帶上鍊接及署名。

Spark ML裡的核心API已經由基於RDD換成了基於DataFrame,為了使讀取到的值成為DataFrame型別,我們可以直接使用讀取CSV的方式來讀取文字檔案,可問題來了,當文字檔案中每一行的各個資料被不定數目的空格所隔開時,我們無法將這些不定數目的空格當作CSV檔案的分隔符(因為Spark讀取CSV檔案時,不支援正則表示式作為分隔符),一個常用方法是先將資料讀取為rdd,然後用map方法構建元組,再用toDF方法轉為DataFrame,但是如果列數很多的話,構建元組會很麻煩。本文將介紹spark讀取多列txt檔案後轉成DataFrame的三種方法。

資料說明 使用Synthetic Control Chart Time Series資料synthetic_control.data,資料包括600個數據點(行),每個資料點有60個屬性,詳細資訊見:  http://archive.ics.uci.edu/ml/databases/synthetic_control/ 

如圖,每個資料點的不同屬性用不定數量的空格隔開,為了解決這個問題,本文將介紹兩種方法(現已更新為三種方法)。  18.08.17更新!今天發現了一個新的方法,比原來的第二種方法還簡單了許多,請讀者在上策中檢視。

下策 基本思想 本方法非常繁瑣且效率較低,是我在沒看到第二種方法時自己想的,本方法的思想是:  1. 直接讀取資料,儲存成一個String型別的RDD  2. 將此RDD中每一行中的不定數量的空格用正則表示式匹配選出後替換成“,”  3. 將處理過後的RDD儲存到一個臨時目錄中  4. 以CSV方式讀取此臨時目錄中的資料,便可將讀到的資料直接存成一個多列的DataFrame  5. 最後將此DataFrame的資料型別轉為Double

程式碼 import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession}

def readData(spark: SparkSession, path: String): DataFrame = {

  // 讀取資料並將其中的分隔符(不定個數的空格)都轉為“,”   val tmpRdd = spark.sparkContext.textFile(path).map(_.replaceAll("\\s+", ","))

  // 將轉換過的資料儲存到一個臨時目錄中   val tmpPathStr = "file:///home/xuqm/ML_Data/input/tmp"   // 判斷此臨時目錄是否存在,若存在則刪除   val tmpPath: Path = new Path(tmpPathStr)   val fs: FileSystem = tmpPath.getFileSystem(new Configuration())   if (fs.exists(tmpPath)) {     fs.delete(tmpPath, true)   }   // 儲存   tmpRdd.saveAsTextFile(tmpPathStr)

  // 從此臨時目錄中以CSV方式讀取資料   val df = spark.read.csv(tmpPathStr)   // 將讀取到的資料中的每一列都轉為Double型別   val cols = df.columns.map(f => col(f).cast(DoubleType))   val data = df.select(cols: _*)

  data } 中策 程式碼及說明 import org.apache.spark.sql.types._ import org.apache.spark.sql.Row

// 讀取資料 暫存為RDD val rdd = sc.textFile("file:///home/xuqm/ML_Data/input/synthetic_control.data") // 從第一行資料中獲取最後轉成的DataFrame應該有多少列 並給每一列命名 val colsLength = rdd.first.split("\\s+").length val colNames = new Array[String](colsLength) for (i <- 0 until colsLength) {   colNames(i) = "col" + (i + 1) } // 將RDD動態轉為DataFrame // 設定DataFrame的結構 val schema = StructType(colNames.map(fieldName => StructField(fieldName, DoubleType))) // 對每一行的資料進行處理 val rowRDD = rdd.map(_.split("\\s+").map(_.toDouble)).map(p => Row(p: _*)) // 將資料和結構合成,建立為DataFrame val data = spark.createDataFrame(rowRDD, schema) 結果展示 scala> val data = spark.createDataFrame(rowRDD, schema) data: org.apache.spark.sql.DataFrame = [col1: double, col2: double ... 58 more fields] scala> data.show(2) +-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ |   col1|   col2|   col3|   col4|   col5|   col6|   col7|   col8|   col9|  col10|  col11|  col12|  col13| col14|  col15|  col16|  col17|  col18|  col19|  col20|  col21|  col22|  col23|  col24|  col25|  col26|  col27|  col28|  col29|  col30|  col31|  col32|  col33|  col34|  col35|  col36|  col37|  col38|  col39|  col40|  col41|  col42|  col43|  col44|  col45|  col46|  col47|  col48|  col49|  col50|  col51|  col52|  col53|  col54|  col55|  col56|  col57|  col58|  col59|  col60| +-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ |28.7812|34.4632|31.3381|31.2834|28.9207|33.7596|25.3969|27.7849|35.2479|27.1159|32.8717|29.2171|36.0253|32.337|34.5249|32.8717|34.1173|26.5235|27.6623|26.3693|25.7744|  29.27|30.7326|29.5054|33.0292|  25.04|28.9167|24.3437|26.1203|34.9424|25.0293|26.6311|35.6541|28.4353|29.1495|28.1584|26.1927|33.3182|30.9772|27.0443|35.5344|26.2353|28.9964|32.0036|31.0558|34.2553|28.0721|28.9402|35.4973| 29.747|31.4333|24.5556|33.7431|25.0466|34.9318|34.9879|32.4721|33.3759|25.4652|25.8717| |24.8923| 25.741|27.5532|32.8217|27.8789|31.5926|31.4861|35.5469|27.9516|31.6595|27.5415|31.1887|27.4867|31.391| 27.811| 24.488|27.5918|35.6273|35.4102|31.4167|30.7447|24.1311|35.1422|30.4719|31.9874|33.6615|25.5511|30.4686|33.6472|25.0701|34.0765|32.5981|28.3038|26.1471|26.9414|31.5203|33.1089|24.1491|28.5157|25.7906|35.9519|26.5301|24.8578|25.9562|32.8357|28.5322|26.3458|30.6213|28.9861|29.4047|32.5577|31.0205|26.6418|28.4331|33.6564|26.4244|28.4661|34.2484|32.1005| 26.691| +-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+ only showing top 2 rows 上策 基本思想 讀取原始檔案,用正則表示式分割每個樣本點的屬性值,儲存成Array[String]型別的RDD 利用Spark ML庫中的LabeledPoint類將資料轉換成LabeledPoint型別的RDD。  LabeledPoint型別包含label列和features列,label列即標籤列,是Double型別的,因為本次資料未經訓練還沒有標籤,所以可隨意給定一個數字;features列即特徵向量列,是向量型別的,本次資料均為特徵點,所以用Vectors類全部轉換為向量型別。 將LabeledPoint型別的RDD轉換為DataFrame並只選擇其features列,得到一個新的DataFrame,然後就可以在此df上進行一些機器學習演算法(如:KMeans)了。 程式碼 import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.Vectors

// 讀取資料並分割每個樣本點的屬性值 形成一個Array[String]型別的RDD val rdd = sc.textFile("file:///home/xuqm/ML_Data/input/synthetic_control.data").map(_.split("\\s+")) // 將rdd轉換成LabeledPoint型別的RDD val LabeledPointRdd = rdd.map(x=>LabeledPoint(0,Vectors.dense(x.map(_.toDouble)))) // 轉成DataFrame並只取"features"列 val data = spark.createDataFrame(LabeledPointRdd).select("features") 結果展示 scala> val data = spark.createDataFrame(LabeledPointRdd).select("features") data: org.apache.spark.sql.DataFrame = [features: vector]

scala> data.show(2,false) +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |features                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    | +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |[28.7812,34.4632,31.3381,31.2834,28.9207,33.7596,25.3969,27.7849,35.2479,27.1159,32.8717,29.2171,36.0253,32.337,34.5249,32.8717,34.1173,26.5235,27.6623,26.3693,25.7744,29.27,30.7326,29.5054,33.0292,25.04,28.9167,24.3437,26.1203,34.9424,25.0293,26.6311,35.6541,28.4353,29.1495,28.1584,26.1927,33.3182,30.9772,27.0443,35.5344,26.2353,28.9964,32.0036,31.0558,34.2553,28.0721,28.9402,35.4973,29.747,31.4333,24.5556,33.7431,25.0466,34.9318,34.9879,32.4721,33.3759,25.4652,25.8717] | |[24.8923,25.741,27.5532,32.8217,27.8789,31.5926,31.4861,35.5469,27.9516,31.6595,27.5415,31.1887,27.4867,31.391,27.811,24.488,27.5918,35.6273,35.4102,31.4167,30.7447,24.1311,35.1422,30.4719,31.9874,33.6615,25.5511,30.4686,33.6472,25.0701,34.0765,32.5981,28.3038,26.1471,26.9414,31.5203,33.1089,24.1491,28.5157,25.7906,35.9519,26.5301,24.8578,25.9562,32.8357,28.5322,26.3458,30.6213,28.9861,29.4047,32.5577,31.0205,26.6418,28.4331,33.6564,26.4244,28.4661,34.2484,32.1005,26.691]| +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 2 rows 參考資料 [1]董克倫.spark 將DataFrame所有的列型別改為double[OL].2018-04-27/2018-08-08  [2]董克倫.舊版spark(1.6版本) 將rdd動態轉為dataframe[OL].2018-05-11/2018-08-08  [3]吳茂貴.深度實踐Spark機器學習[M].北京:機械工業出版社.2018:104-106 ---------------------  作者:鹿丸君  來源:CSDN  原文:https://blog.csdn.net/coding01/article/details/81512430  版權宣告:本文為博主原創文章,轉載請附上博文連結!