1. 程式人生 > >SparkSQL把rdd轉化為DataFrame時,想要把整個陣列的值都放到Row中則麼辦?

SparkSQL把rdd轉化為DataFrame時,想要把整個陣列的值都放到Row中則麼辦?

在使用sparkSQL,有時想要把rdd中的資料轉換成DataFrame,RDD中的的資料可能時Array型別,或者是想要把陣列型別中的所有元素放到Row中,當陣列中的元素特別多時,可能就會變得更加麻煩,其實Row的Object中為我們提供了一個很好的方法,就是merge方法,話不多說,直接看程式碼吧
在這裡插入圖片描述

Object  Demo {
def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local"
) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //這裡我是並行化建立了一個RDD,當然也可從檔案讀取 val lines:RDD[String] = sc.parallelize(Array("a 1 c","n 2 m")) //把每一行資料都按空格進行切分後,得到的RDD中的是Array陣列 val splits: RDD[Array[String]] = lines.map(_.split(" ")) //現在想要把splits轉換成RDD[Row]型別,以進行轉換為DataFrame
val rowRDD: RDD[Row] = .map(t => { var row: Row = Row() //先建立一個Row,空的 for (i <- 0 until (t.size)) { //每次把這次的Row型別,和原來的Row進行合併, 最後的row中有是 //數組裡面的所有欄位 //把第2個值 轉成 int型別 在新增到Row中 if(i==1)row = Row.merge(row,Row(t(i).toInt)) else row = Row.merge(row,
Row(t(i))) } //最後把row 返回,此時的row中相當於Row(t(0),t(1).toInt,t(2)) //在數字段數特別多時,就會特別麻煩,可以使用上面的那種方法 row }) //最後在建立元資料 val structType = StructType(List(StructField("name",StringType,true),StructField("name1",IntegerType,true),StructField("name2",StringType,true))) val df1 = sqlContext.createDataFrame(rowRDD,structType) df1.show() sc.stop() } }

上面的列子欄位比較少,可能不是特別明顯,可以通過下面的列子在進行比較

object Bz2toParquet01 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local").set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    sqlContext.setConf("spark.sql.parquet.compression.codec","snappy")
    val lines = sc.textFile("D:\\tools\\qianfeng\\hadoop\\spark\\sparkStreaming\\DMP專案\\2016-10-01_06_p1_invalid.1475274123982.log.FINISH.bz2")
    val rowRDD = lines.map(x=>x.split(",",x.size)).filter(
    t=>t.length>=85).map(arr=>{
    //此處將欄位取出來放到Row中
      Row(
        arr(0),
        NBF.toInt(arr(1)),
        NBF.toInt(arr(2)),
        NBF.toInt(arr(3)),
        NBF.toInt(arr(4)),
        arr(5),
        arr(6),
        NBF.toInt(arr(7)),
        NBF.toInt(arr(8)),
        NBF.toDouble(arr(9)),
        NBF.toDouble(arr(10)),
        arr(11),
        arr(12),
        arr(13),
        arr(14),
        arr(15),
        arr(16),
        NBF.toInt(arr(17)),
        arr(18),
        arr(19),
        NBF.toInt(arr(20)),
        NBF.toInt(arr(21)),
        arr(22),
        arr(23),
        arr(24),
        arr(25),
        NBF.toInt(arr(26)),
        arr(27),
        NBF.toInt(arr(28)),
        arr(29),
        NBF.toInt(arr(30)),
        NBF.toInt(arr(31)),
        NBF.toInt(arr(32)),
        arr(33),
        NBF.toInt(arr(34)),
        NBF.toInt(arr(35)),
        NBF.toInt(arr(36)),
        arr(37),
        NBF.toInt(arr(38)),
        NBF.toInt(arr(39)),
        NBF.toDouble(arr(40)),
        NBF.toDouble(arr(41)),
        NBF.toInt(arr(42)),
        arr(43),
        NBF.toDouble(arr(44)),
        NBF.toDouble(arr(45)),
        arr(46),
        arr(47),
        arr(48),
        arr(49),
        arr(50),
        arr(51),
        arr(52),
        arr(53),
        arr(54),
        arr(55),
        arr(56),
        NBF.toInt(arr(57)),
        NBF.toDouble(arr(58)),
        NBF.toInt(arr(59)),
        NBF.toInt(arr(60)),
        arr(61),
        arr(62),
        arr(63),
        arr(64),
        arr(65),
        arr(66),
        arr(67),
        arr(68),
        arr(69),
        arr(70),
        arr(71),
        arr(72),
        NBF.toInt(arr(73)),
        NBF.toDouble(arr(74)),
        NBF.toDouble(arr(75)),
        NBF.toDouble(arr(76)),
        NBF.toDouble(arr(77)),
        NBF.toDouble(arr(78)),
        arr(79),
        arr(80),
        arr(81),
        arr(82),
        arr(83),
        NBF.toInt(arr(84))
      )
    })

//SchemaUtils是我自定義的一個工具類,裡面建立了Row的Schame資訊
    val df1: DataFrame = sqlContext.createDataFrame(rowRDD,SchemaUtils.schema)
    df1.write.parquet("hdfs:\\spark\out3")

    sc.stop()
  }

}

下面是用一個for迴圈搞定那麼長的Row

object Bz2toParquet {
  def main(args: Array[String]): Unit = {

    //首先判斷目錄是否為空
    if(args.length != 2){
      println("目錄不正確,退出程式")
      sys.exit()
    }
    //建立一個幾個儲存輸入輸入出目錄
   
    val conf = new SparkConf().setAppName(s"${this.getClass.getName}").setMaster("local")
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    //在spark1.6版本的預設的壓縮方式還不是snappy,到2.0以後才預設是snappy
   // sqlContext.setConf("spark.sql.parquet.compression.codec","snappy")
    sqlContext.setConf("spark.sql.parquet.compression.codec","snappy")
    val lines = sc.textFile(inputPath)
    //開始過濾,保證欄位大於85,並且要解析內部的,,,,進行特殊處理
    val rowRDD: RDD[Row] = lines.map(x=>{
      x.split(",",x.length) //按所有的, 解析, 如果過不寫長度,只會解析一個
    }).filter(_.length>=85).map(arr=> {
      var row : Row= Row()
      //這裡只需要把你要轉換為Int或Double的進行一下判斷就可以啦
      for (i <- 0 until  85){
        if(i==1||i==2||i==3||i==4||i==7||i==8||i==17||i==21||i==20||i==26||i==28||i==30||i==31||i==32
        ||i==34||i==35||i==36||i==38||i==39||i==42||i==57||i==59||i==60||i==73||i==84) row = Row.merge(row,Row(NBF.toInt(arr(i))))
        else if(i==9||i==10||i==40||i==41||i==44||i==45||i==58||i==74||i==75||i==76||i==77||i==78)row = Row.merge(row,Row(NBF.toDouble(arr(i))))
        else row = Row.merge(row,Row(arr(i)))
      }
      row

    })


    val df1 = sqlContext.createDataFrame(rowRDD,SchemaUtils.schema)
    //重新指定分割槽,並輸出為parquet檔案
    df1.coalesce(1).write.parquet(outputPath)
    sc.stop()
  }

}