1. 程式人生 > >Error:(3, 12) Implementation restriction: case classes cannot have more than

Error:(3, 12) Implementation restriction: case classes cannot have more than

構建DataFrame引數大於22(反射方式)

一.直接上程式碼

package com.etlstu
import java.util.Properties

import com.utils.NBF
import com.utilsStu.logmetadata
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.{SparkConf, SparkContext}

object Bz2ParquetStu {
  def main(args: Array[String]): Unit = {
    //模擬企業級程式設計,首先判斷目錄是否為空
    if(args.length != 2){
      println("目錄不正確,退出程式!!!")
      sys.exit()
    }
    //建立一個集合的輸入輸出目錄
    val Array(inputPath,outputPath) = args
    val conf = new SparkConf()
      .setAppName(s"${this.getClass.getName}").setMaster("local[*]")
    //搞定第二個需求
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sQLContext = new SQLContext(sc)
    //在1.6版本時候預設的壓縮方式還不是snappy,到了2.0之後預設是snappy
    sQLContext.setConf("spark.sql.parquet.compression.codec","snappy")
    //開始讀取資料
    val lines = sc.textFile(inputPath)
    //進行過濾,保證欄位大於85,並且需要解析內的,,,,,,, 要進行特殊處理
    val rdd: RDD[logmetadata] = lines.map(t=>t.split(",",t.length)).filter(_.length >= 85).map(arr=>{
      new logmetadata(
        arr(0),
        NBF.toInt(arr(1)),
        NBF.toInt(arr(2)),
        NBF.toInt(arr(3)),
        NBF.toInt(arr(4)),
        arr(5),
        arr(6),
        NBF.toInt(arr(7)),
        NBF.toInt(arr(8)),
        NBF.toDouble(arr(9)),
        NBF.toDouble(arr(10)),
        arr(11),
        arr(12),
        arr(13),
        arr(14),
        arr(15),
        arr(16),
        NBF.toInt(arr(17)),
        arr(18),
        arr(19),
        NBF.toInt(arr(20)),
        NBF.toInt(arr(21)),
        arr(22),
        arr(23),
        arr(24),
        arr(25),
        NBF.toInt(arr(26)),
        arr(27),
        NBF.toInt(arr(28)),
        arr(29),
        NBF.toInt(arr(30)),
        NBF.toInt(arr(31)),
        NBF.toInt(arr(32)),
        arr(33),
        NBF.toInt(arr(34)),
        NBF.toInt(arr(35)),
        NBF.toInt(arr(36)),
        arr(37),
        NBF.toInt(arr(38)),
        NBF.toInt(arr(39)),
        NBF.toDouble(arr(40)),
        NBF.toDouble(arr(41)),
        NBF.toInt(arr(42)),
        arr(43),
        NBF.toDouble(arr(44)),
        NBF.toDouble(arr(45)),
        arr(46),
        arr(47),
        arr(48),
        arr(49),
        arr(50),
        arr(51),
        arr(52),
        arr(53),
        arr(54),
        arr(55),
        arr(56),
        NBF.toInt(arr(57)),
        NBF.toDouble(arr(58)),
        NBF.toInt(arr(59)),
        NBF.toInt(arr(60)),
        arr(61),
        arr(62),
        arr(63),
        arr(64),
        arr(65),
        arr(66),
        arr(67),
        arr(68),
        arr(69),
        arr(70),
        arr(71),
        arr(72),
        NBF.toInt(arr(73)),
        NBF.toDouble(arr(74)),
        NBF.toDouble(arr(75)),
        NBF.toDouble(arr(76)),
        NBF.toDouble(arr(77)),
        NBF.toDouble(arr(78)),
        arr(79),
        arr(80),
        arr(81),
        arr(82),
        arr(83),
        NBF.toInt(arr(84))
      )
    })
    //注意需要匯入隱式轉換,
    import sQLContext.implicits._
    val dataFrame: DataFrame = rdd.toDF()
    dataFrame.registerTempTable("AwenTable")
    val sql: DataFrame = sQLContext.sql("select count(*) as ct,provincename,cityname from AwenTable group by provincename,cityname")
    //將查詢結果儲存到MySQL
    val df= sql.select("ct","provincename","cityname")
    val username: String ="root"
    val password: String = "root"
    val url: String ="jdbc:mysql://localhost:3306/awenyun"
    val  prop = new Properties()
    prop.put("user", username)
    prop.put("password", password)
    prop.put("driver", "com.mysql.jdbc.Driver")
    // write dataframe to jdbc mysql
    df.write.mode(SaveMode.Append).jdbc(url, "ToMySQL", prop)
    //儲存parquet檔案
    //dataFrame.coalesce(1).write.parquet(outputPath)
    //結果輸出成Json格式,輸出到磁碟目錄
    //sql.coalesce(1).write.json(outputPath)
    sc.stop()
  }
}

二.工具類實現引數大於22

package com.utilsStu

class logmetadata(
                   sessionid: String,
                   advertisersid: Int,
                   adorderid: Int,
                   adcreativeid: Int,
                   adplatformproviderid: Int,
                   sdkversion: String,
                   adplatformkey: String,
                   putinmodeltype: Int,
                   requestmode: Int,
                   adprice: Double,
                   adppprice: Double,
                   requestdate: String,
                   ip: String,
                   appid: String,
                   appname: String,
                   uuid: String,
                   device: String,
                   client: Int,
                   osversion: String,
                   density: String,
                   pw: Int,
                   ph: Int,
                   long: String,
                   lat: String,
                   provincename: String,
                   cityname: String,
                   ispid: Int,
                   ispname: String,
                   networkmannerid: Int,
                   networkmannername: String,
                   iseffective: Int,
                   isbilling: Int,
                   adspacetype: Int,
                   adspacetypename: String,
                   devicetype: Int,
                   processnode: Int,
                   apptype: Int,
                   district: String,
                   paymode: Int,
                   isbid: Int,
                   bidprice: Double,
                   winprice: Double,
                   iswin: Int,
                   cur: String,
                   rate: Double,
                   cnywinprice: Double,
                   imei: String,
                   mac: String,
                   idfa: String,
                   openudid: String,
                   androidid: String,
                   rtbprovince: String,
                   rtbcity: String,
                   rtbdistrict: String,
                   rtbstreet: String,
                   storeurl: String,
                   realip: String,
                   isqualityapp: Int,
                   bidfloor: Double,
                   aw: Int,
                   ah: Int,
                   imeimd5: String,
                   macmd5: String,
                   idfamd5: String,
                   openudidmd5: String,
                   androididmd5: String,
                   imeisha1: String,
                   macsha1: String,
                   idfasha1: String,
                   openudidsha1: String,
                   androididsha1: String,
                   uuidunknow: String,
                   userid: String,
                   iptype: Int,
                   initbidprice: Double,
                   adpayment: Double,
                   agentrate: Double,
                   lomarkrate: Double,
                   adxrate: Double,
                   title: String,
                   keywords: String,
                   tagid: String,
                   callbackdate: String,
                   channelid: String,
                   mediatype: Int
                 )
  extends Product with scala.Serializable{
 
  override def canEqual(that: Any): Boolean = that.isInstanceOf[logmetadata]
	//設定引數個數為85
  override def productArity: Int = 85
  	//使用匹配模式實現引數的上限
  override def toString(): String ={
    s"logmetadata[${(for(i<- 0 until productArity) yield productElement(i) match{case Some(x)=>x case t=>t}).mkString(",")}]"
  }

  @throws(classOf[IndexOutOfBoundsException])
  override def productElement(n: Int) = n match {
    case 0  => sessionid
    case 1  => advertisersid
    case 2  => adorderid
    case 3  => adcreativeid
    case 4  => adplatformproviderid
    case 5  => sdkversion
    case 6  => adplatformkey
    case 7  => putinmodeltype
    case 8  => requestmode
    case 9  => adprice
    case 10  => adppprice
    case 11  => requestdate
    case 12  => ip
    case 13  => appid
    case 14  => appname
    case 15  => uuid
    case 16  => device
    case 17  => client
    case 18  => osversion
    case 19  => density
    case 20  => pw
    case 21  => ph
    case 22  => long
    case 23  => lat
    case 24  => provincename
    case 25  => cityname
    case 26  => ispid
    case 27  => ispname
    case 28  => networkmannerid
    case 29  => networkmannername
    case 30  => iseffective
    case 31  => isbilling
    case 32  => adspacetype
    case 33  => adspacetypename
    case 34  => devicetype
    case 35  => processnode
    case 36  => apptype
    case 37  => district
    case 38  => paymode
    case 39  => isbid
    case 40  => bidprice
    case 41  => winprice
    case 42  => iswin
    case 43  => cur
    case 44  => rate
    case 45  => cnywinprice
    case 46  => imei
    case 47  => mac
    case 48  => idfa
    case 49  => openudid
    case 50  => androidid
    case 51  => rtbprovince
    case 52  => rtbcity
    case 53  => rtbdistrict
    case 54  => rtbstreet
    case 55  => storeurl
    case 56  => realip
    case 57  => isqualityapp
    case 58  => bidfloor
    case 59  => aw
    case 60  => ah
    case 61  => imeimd5
    case 62  => macmd5
    case 63  => idfamd5
    case 64  => openudidmd5
    case 65  => androididmd5
    case 66  => imeisha1
    case 67  => macsha1
    case 68  => idfasha1
    case 69  => openudidsha1
    case 70  => androididsha1
    case 71  => uuidunknow
    case 72  => userid
    case 73  => iptype
    case 74  => initbidprice
    case 75  => adpayment
    case 76  => agentrate
    case 77  => lomarkrate
    case 78  => adxrate
    case 79  => title
    case 80  => keywords
    case 81  => tagid
    case 82  => callbackdate
    case 83  => channelid
    case 84  => mediatype
    case _ => throw new IndexOutOfBoundsException(n.toString())
  }
}