1. 程式人生 > >大資料學習之路87-SparkSQL的執行結果以不同方式寫出,及載入

大資料學習之路87-SparkSQL的執行結果以不同方式寫出,及載入

我們可以將我們之前寫的wordcount的結果寫成各種格式:

csv格式:

程式碼如下:

package com.test.SparkSQL

import org.apache.avro.generic.GenericData.StringType
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext, types}
import org.apache.spark.{SparkConf, SparkContext}

case class Person(id:Long,name:String,age:Int,fv:Int)

object OldSparkSQL {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("OldSparkSQL").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val lines: RDD[String] = sc.textFile("hdfs://marshal:9000/person.txt")
    //val PersonRDD = lines.map(_.split(",")).map(arr => Person(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt))
    val rowRDD = lines.map(line =>{
       val fields = line.split(",")
      val id = fields(0).toLong
      val name  = fields(1)
      val age = fields(2).toInt
      val fv = fields(3).toInt
     // Person(id,name,age,fv)

      Row(id,name,age,fv)
    })
    val sqlContext = new SQLContext(sc)
    //匯入隱式轉換才能變為DataFrame
    import sqlContext.implicits._
    val schema = StructType(
      List(
        StructField("id",LongType,true),
        StructField("name",org.apache.spark.sql.types.StringType,true),
        StructField("age",IntegerType,true),
        StructField("fv",IntegerType,true)

      )
    )

    val pdf: DataFrame = sqlContext.createDataFrame(rowRDD,schema)
    pdf.registerTempTable("t_person")
    val result: DataFrame = sqlContext.sql("select name,age,fv from t_person order by fv desc, age asc")
    result.write.csv("C:/Users/11489/Desktop/result1")
    // result.show()
    sc.stop()
  }
}

結果如下:

json格式:

程式碼如下:

package com.test.SparkSQL

import org.apache.spark.sql._

object DataSetWordCount {
  def main(args: Array[String]): Unit = {
    import org.apache.spark
    //建立一個sparkSession
    val session: SparkSession = SparkSession.builder()
      .appName("DataSetWordCount")
      .master("local[*]")
      .getOrCreate()
    import session.implicits._

    val lines: Dataset[String] = session.read.textFile("D:/a/word.txt")
    val words: Dataset[String] = lines.flatMap(_.split(" "))
//    import org.apache.spark.sql.functions._
//    val result: Dataset[Row] = words.groupBy($"value" as "word")
//      .agg(count("*") as "counts")
//      .sort($"counts" desc)

    val grouped: RelationalGroupedDataset = words.groupBy($"value" as  "word")
    val counted: DataFrame = grouped.count()
    val result: Dataset[Row] = counted.sort($"count" desc)
     result.write.json("C:/Users/11489/Desktop/result2")
     result.show()
  }
}

結果如下:

parquet格式:

這個是支援列儲存的型別,那麼存成這種格式有什麼好處呢?

這樣以後我們想讀哪列就可以讀哪列,並且他是基於列進行壓縮的。佔用空間更小。

package com.test.SparkSQL

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object sqlWordCount {
  def main(args: Array[String]): Unit = {
    val session: SparkSession = SparkSession.builder()
      .appName("sqlWordCount")
      .master("local[*]")
      .getOrCreate()
    //指定讀取資料的位置
    //val lines: Dataset[String] = session.read.textFile("D:/a/word.txt")
    val lines: DataFrame = session.read.format("text").load("D:/a/word.txt")
    //匯入sparksession中的隱式轉換
    import session.implicits._
    val words: Dataset[String] = lines.flatMap(_.getAs[String]("value").split(" "))
//    val df: DataFrame = words.withColumnRenamed("value","word")
//    //先建立檢視,再執行sql
//    df.createTempView("v_wc")
//    val result: DataFrame = session.sql("select word,count(*) counts from v_wc group by word order by counts desc")
//    result.show()

//    //DSL方式
//    import org.apache.spark.sql.functions._
//    val result: Dataset[Row] = words.groupBy($"value").agg(count("*") as "counts").sort($"counts" desc)
//    result.show()
    //將DataSet轉換成DataFrame,這樣就可以寫sql了
    val df = words.toDF()
    df.createTempView("v_wc")
    val result: DataFrame = session.sql("select value word,count(*) counts from v_wc group by word order by counts desc")
    result.write.parquet("C:/Users/11489/Desktop/result3")
    result.show()

  }
}

執行結果:

我們可以看一下這個檔名:

從mysql中載入資料:

package com.test.SparkSQL

import org.apache.spark.sql.{DataFrame, SparkSession}

object JdbcDataSource {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("JdbcDataSource")
      .master("local[*]")
      .getOrCreate()
    val logs: DataFrame = spark.read.format("jdbc")
      .options(Map("url" -> "jdbc:mysql://localhost:3306/lfr",
        "driver" -> "com.mysql.jdbc.Driver",
        "dbtable" -> "users",
        "user" -> "root",
        "password" -> "928918zbgch521"))
      .load()
    logs.show()

  }
}

執行結果:

將結果寫入到mysql:

程式碼如下:

 val prop = new Properties()
    prop.put("user","root")
    prop.put("password","928918zbgch521")
    logs.where(logs.col("id") <= 3)
      .write.mode("append")
      .jdbc("jdbc:mysql://localhost:3306/lfr","users",prop)

執行結果:

從parquet檔案中讀取資料:

程式碼如下:

package com.test.SparkSQL

import org.apache.spark.sql.{DataFrame, SparkSession}

object ReadParquet {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("ReadParquet")
      .master("local[*]")
      .getOrCreate()
    val result: DataFrame = spark.read.format("parquet").load("C:/Users/11489/Desktop/result3")
    result.show()
  }
}

執行結果如下: