1. 程式人生 > >Spark SQL筆記整理(三):加載保存功能與Spark SQL函數

Spark SQL筆記整理(三):加載保存功能與Spark SQL函數

code ren maven依賴 append 關聯 dfs 取值 struct nal

加載保存功能

數據加載(json文件、jdbc)與保存(json、jdbc)

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p1

import java.util.Properties

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}

/**
  * SparkSQL關於加載數據和數據落地的各種實戰操作
  */
object _03SparkSQLLoadAndSaveOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName)

        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

//        readOps(sqlContext)
        writeOps(sqlContext)
        sc.stop()
    }

    /**
      * 在write結果到目錄中的時候需要留意相關異常
      *     org.apache.spark.sql.AnalysisException: path file:/D:/data/spark/sql/people-1.json already exists
      * 如果還想使用該目錄的話,就需要設置具體的保存模式SaveMode
      * ErrorIfExist
      *     默認的,目錄存在,拋異常
      * Append
      *     追加
      * Ingore
      *     忽略,相當於不執行
      * Overwrite
      *     覆蓋
      */
    def writeOps(sqlContext:SQLContext): Unit = {
        val df = sqlContext.read.json("D:/data/spark/sql/people.json")
        df.registerTempTable("people")
        val retDF = sqlContext.sql("select * from people where age > 20")
//        retDF.show()
        // 將結果落地
        //retDF.coalesce(1).write.mode(SaveMode.Overwrite).json("D:/data/spark/sql/people-1.json")
        // 落地到數據庫
        val url = "jdbc:mysql://localhost:3306/test"
        val table = "people1"   // 會重新創建一張新表
        val properties = new Properties()
        properties.put("user", "root")
        properties.put("password", "root")
        retDF.coalesce(1).write.jdbc(url, table, properties)
    }

    /*
        // sparkSQL讀數據
        // java.lang.RuntimeException: file:/D:/data/spark/sql/people.json is not a Parquet file
        sparkSQL使用read.load加載的默認文件格式為parquet(parquet.apache.org)
        加載其它文件格式怎麽辦?
            需要指定加載文件的格式.format("json")
     */
    def readOps(sqlContext:SQLContext): Unit = {
        //        val df = sqlContext.read.load("D:/data/spark/sql/users.parquet")
        //        val df = sqlContext.read.format("json").load("D:/data/spark/sql/people.json")
        //        val df = sqlContext.read.json("D:/data/spark/sql/people.json")
        val url = "jdbc:mysql://localhost:3306/test"
        val table = "people"
        val properties = new Properties()
        properties.put("user", "root")
        properties.put("password", "root")
        val df = sqlContext.read.jdbc(url, table, properties)

        df.show()
    }
}

當執行讀操作時,輸出結果如下:

+---+----+---+------+
| id|name|age|height|
+---+----+---+------+
|  1| 小甜甜| 18| 168.0|
|  2| 小丹丹| 19| 167.0|
|  3|  大神| 25| 181.0|
|  4|  團長| 38| 158.0|
|  5|  記者| 22| 169.0|
+---+----+---+------+

當執行寫操作時:

1.如果保存到json文件
註意有各種寫模式,另外其保存的是一個目錄,與HDFS兼容的目錄格式

2.如果保存到jdbc
則會在數據庫中創建一個DataFrame所包含列的表,註意該表不能存在

Spark SQL和Hive的集成

需要先啟動Hive,然後再進行下面的操作。

代碼編寫

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import cn.xpleaf.bigdata.spark.scala.sql.p1._01SparkSQLOps
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

/**
  * 通過創建HiveContext來操作Hive中表的數據
  * 數據源:
  * teacher_info.txt
  *     name(String)    height(double)
  *     zhangsan,175
  *     lisi,180
  *     wangwu,175
  *     zhaoliu,195
  *     zhouqi,165
  *     weiba,185
  *
  *     create table teacher_info(
  *     name string,
  *     height double
  *     ) row format delimited
  *     fields terminated by ‘,‘;
  *
  * teacher_basic.txt
  *     name(String)    age(int)    married(boolean)    children(int)
  *     zhangsan,23,false,0
  *     lisi,24,false,0
  *     wangwu,25,false,0
  *     zhaoliu,26,true,1
  *     zhouqi,27,true,2
  *     weiba,28,true,3
  *
  *     create table teacher_basic(
  *     name string,
  *     age int,
  *     married boolean,
  *     children int
  *     ) row format delimited
  *     fields terminated by ‘,‘;
  * *
  * 需求:
  *1.通過sparkSQL在hive中創建對應表,將數據加載到對應表
  *2.執行sparkSQL作業,計算teacher_info和teacher_basic的關聯信息,將結果存放在一張表teacher中
  * 
  * 在集群中執行hive操作的時候,需要以下配置:
  *     1、將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下
        2、在$SPARK_HOME/conf/spark-env.sh中添加一條記錄
         export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar
  */
object _01HiveContextOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf()
//            .setMaster("local[2]")
            .setAppName(_01SparkSQLOps.getClass.getSimpleName)

        val sc = new SparkContext(conf)
        val hiveContext = new HiveContext(sc)

        //創建teacher_info表
        hiveContext.sql("CREATE TABLE teacher_info(" +
            "name string, " +
            "height double) " +
            "ROW FORMAT DELIMITED " +
            "FIELDS TERMINATED BY ‘,‘")

        hiveContext.sql("CREATE TABLE teacher_basic(" +
            "name string, " +
            "age int, " +
            " married boolean, " +
            "children int) " +
            "ROW FORMAT DELIMITED " +
            "FIELDS TERMINATED BY ‘,‘")

        // 向表中加載數據
        hiveContext.sql("LOAD DATA LOCAL INPATH ‘/home/uplooking/data/hive/sql/teacher_info.txt‘ INTO TABLE teacher_info")
        hiveContext.sql("LOAD DATA LOCAL INPATH ‘/home/uplooking/data/hive/sql/teacher_basic.txt‘ INTO TABLE teacher_basic")

        //第二步操作 計算兩張表的關聯數據
        val joinDF = hiveContext.sql("SELECT " +
            "b.name, " +
            "b.age, " +
            "if(b.married, ‘已婚‘, ‘未婚‘) as married, " +
            "b.children, " +
            "i.height " +
            "FROM teacher_info i " +
            "INNER JOIN teacher_basic b ON i.name = b.name")

        joinDF.collect().foreach(println)

        joinDF.write.saveAsTable("teacher")

        sc.stop()
    }
}

打包、上傳與配置

打包後上傳到集群環境中,然後針對Spark做如下配置:

在集群中執行hive操作的時候,需要以下配置:
    1、將hive-site.xml拷貝到spark/conf目錄下,將mysql connector拷貝到spark/lib目錄下
    2、在$SPARK_HOME/conf/spark-env.sh中添加一條記錄
       export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar

提交spark作業

使用的spark提交作業的腳本如下:

[uplooking@uplooking01 spark]$ cat spark-submit-standalone.sh 
#export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop

/home/uplooking/app/spark/bin/spark-submit --class $2 --master spark://uplooking02:7077 --executor-memory 1G --num-executors 1 $1 \

執行如下命令:

./spark-submit-standalone.sh spark-hive.jar cn.xpleaf.bigdata.spark.scala.sql.p2._01HiveContextOps

驗證

可以在作業執行的輸出結果有看到我們期望的輸出,也可以直接在Hive中操作來進行驗證:

hive> show tables;
OK
hpeople
people
t1
teacher
teacher_basic
teacher_info
Time taken: 0.03 seconds, Fetched: 6 row(s)
hive> select * from teacher;
OK
zhangsan        23      未婚    0       175.0
lisi    24      未婚    0       180.0
wangwu  25      未婚    0       175.0
zhaoliu 26      已婚    1       195.0
zhouqi  27      已婚    2       165.0
weiba   28      已婚    3       185.0
Time taken: 0.369 seconds, Fetched: 6 row(s)

Spark和ES的集成

需要確保ElasticSearch環境已經搭建好。

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark._

/**
  * Spark和ES的集成操作
  *     引入Spark和es的maven依賴
  *     elasticsearch-hadoop
  *     2.3.0
  *     將account.json加載到es的索引庫spark/account
  *     可以參考官方文檔:https://www.elastic.co/guide/en/elasticsearch/hadoop/2.3/spark.html
  */
object _02SparkElasticSearchOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf()
                        .setAppName(_02SparkElasticSearchOps.getClass().getSimpleName)
                        .setMaster("local[2]")
        /**
          * Spark和es的集成配置
          */
        conf.set("es.index.auto.create", "true")
        conf.set("es.nodes", "uplooking01")
        conf.set("es.port", "9200")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

//        write2ES(sqlContext)

        readFromES(sc)

        sc.stop()
    }

    /**
      * 從es中讀數據
      * (使用sparkContext進行操作)
      */
    def readFromES(sc:SparkContext): Unit = {
        val resources = "spark/account"  // 索引庫/類型
        val jsonRDD = sc.esJsonRDD(resources)
        jsonRDD.foreach(println)
    }

    /**
      * 向es中寫入數據
      * (使用sqlContext進行操作)
      */
    def write2ES(sqlContext:SQLContext): Unit = {
        val jsonDF = sqlContext.read.json("D:/data/spark/sql/account.json")
        val resources = "spark/account"  // 索引庫/類型
        jsonDF.saveToEs(resources)
    }
}

Spark SQL函數

概述(Spark 1.5.X ~ 1.6.X的內置函數)

使用Spark SQL中的內置函數對數據進行分析,Spark SQL API不同的是,DataFrame中的內置函數操作的結果是返回一個Column對象,而DataFrame天生就是"A distributed collection of data organized into named columns.",這就為數據的復雜分析建立了堅實的基礎並提供了極大的方便性,例如說,我們在操作DataFrame的方法中可以隨時調用內置函數進行業務需要的處理,這之於我們構建附件的業務邏輯而言是可以極大的減少不必須的時間消耗(基於上就是實際模型的映射),讓我們聚焦在數據分析上,這對於提高工程師的生產力而言是非常有價值的Spark 1.5.x開始提供了大量的內置函數,還有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acos、asin、atan

總體上而言內置函數包含了五大基本類型:

1、聚合函數,例如countDistinct、sumDistinct等;
2、集合函數,例如sort_array、explode等
3、日期、時間函數,例如hour、quarter、next_day
4、數學函數,例如asin、atan、sqrt、tan、round等;
5、開窗函數,例如rowNumber等
6、字符串函數,concat、format_number、rexexp_extract
7、其它函數,isNaN、sha、randn、callUDF

以下為Hive中的知識內容,但是顯然Spark SQL也有同樣的概念
UDF
    用戶自定義函數:User Definded Function
    一路輸入,一路輸出
    a--->A
    strlen("adbad")=5
UDAF
    用戶自定義聚合函數:User Definded Aggregation Function
    多路輸入,一路輸出
    sum(a, b, c, d)---->匯總的結果
表函數
    UDTF:用戶自定義表函數:User Definded Table Function
    多路輸入,多路輸出
    "hello you"
    "hello me"  ---->轉換操作,----->split("")---->Array[] 
    ["hello, "you"]--->
        "hello"
        "you"
    ---->行列轉換

一個基本的案例如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

/**
  * SparkSQL 內置函數操作
  */
object _03SparkSQLFunctionOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf()
            .setAppName(_03SparkSQLFunctionOps.getClass().getSimpleName)
            .setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

        val pdf = sqlContext.read.json("D:/data/spark/sql/people.json")
        pdf.show()

        pdf.registerTempTable("people")

        // 統計人數
        sqlContext.sql("select count(1) from people").show()
        // 統計最小年齡
        sqlContext.sql("select age, " +
            "max(age) as max_age, " +
            "min(age) as min_age, " +
            "avg(age) as avg_age, " +
            "count(age) as count " +
            "from people group by age order by age desc").show()

        sc.stop()
    }
}

輸出結果如下:

+---+------+-------+
|age|height|   name|
+---+------+-------+
| 10| 168.8|Michael|
| 30| 168.8|   Andy|
| 19| 169.8| Justin|
| 32| 188.8|   Jack|
| 10| 158.8|   John|
| 19| 179.8|   Domu|
| 13| 179.8|     袁帥|
| 30| 175.8|     殷傑|
| 19| 179.9|     孫瑞|
+---+------+-------+

18/05/09 17:53:23 INFO FileInputFormat: Total input paths to process : 1
+---+
|_c0|
+---+
|  9|
+---+

18/05/09 17:53:24 INFO FileInputFormat: Total input paths to process : 1
+---+-------+-------+-------+-----+
|age|max_age|min_age|avg_age|count|
+---+-------+-------+-------+-----+
| 32|     32|     32|   32.0|    1|
| 30|     30|     30|   30.0|    2|
| 19|     19|     19|   19.0|    3|
| 13|     13|     13|   13.0|    1|
| 10|     10|     10|   10.0|    2|
+---+-------+-------+-------+-----+

Spark SQL開窗函數

1、Spark 1.5.x版本以後,在Spark SQL和DataFrame中引入了開窗函數,比如最經典的就是我們的row_number(),可以讓我們實現分組取topn的邏輯。

2、做一個案例進行topn的取值(利用Spark的開窗函數),不知道同學們是否還有印象,我們之前在最早的時候,做過topn的計算,當時是非常麻煩的。但是現在用了Spark SQL之後,非常方便。

Spark SQL之UDF操作

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * SparkSQL 內置函數操作
  */
object _04SparkSQLFunctionOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf()
            .setAppName(_04SparkSQLFunctionOps.getClass().getSimpleName)
            .setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

        /**
          * hive中的用戶自定義函數UDF操作(即在SparkSQL中類比hive來進行操作,因為hive和SparkSQL都是交互式計算)
          * 1.創建一個普通的函數
          * 2.註冊(在SqlContext中註冊)
          * 3.直接使用即可
          *
          * 案例:創建一個獲取字符串長度的udf
          */

        // 1.創建一個普通的函數
        def strLen(str:String):Int = str.length

        // 2.註冊(在SqlContext中註冊)
        sqlContext.udf.register[Int, String]("myStrLen", strLen)

        val list = List("Hello you", "Hello he", "Hello me")

        // 將RDD轉換為DataFrame
        val rowRDD = sqlContext.sparkContext.parallelize(list).flatMap(_.split(" ")).map(word => {
            Row(word)
        })
        val scheme = StructType(List(
            StructField("word", DataTypes.StringType, false)
        ))
        val df = sqlContext.createDataFrame(rowRDD, scheme)

        df.registerTempTable("test")

        // 3.直接使用即可
        sqlContext.sql("select word, myStrLen(word) from test").show()

        sc.stop()
    }

}

輸出結果如下:

+-----+---+
| word|_c1|
+-----+---+
|Hello|  5|
|  you|  3|
|Hello|  5|
|   he|  2|
|Hello|  5|
|   me|  2|
+-----+---+

Spark SQL之wordcount操作

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.sql.p2

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}

/**
  * 這兩部分都比較重要:
  * 1.使用SparkSQL完成單詞統計操作
  * 2.開窗函數使用
  */
object _05SparkSQLFunctionOps2 {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        val conf = new SparkConf()
            .setAppName(_05SparkSQLFunctionOps2.getClass().getSimpleName)
            .setMaster("local[2]")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)

        val list = List("Hello you", "Hello he", "Hello me")

        // 將RDD轉換為DataFrame
        val rowRDD = sqlContext.sparkContext.parallelize(list).map(line => {
            Row(line)
        })
        val scheme = StructType(List(
            StructField("line", DataTypes.StringType, false)
        ))

        val df = sqlContext.createDataFrame(rowRDD, scheme)

        df.registerTempTable("test")

        df.show()

        // 執行wordcount
        val sql =  "select t.word, count(1) as count " +
                    "from " +
                        "(select " +
                            "explode(split(line, ‘ ‘)) as word " +
                        "from test) as t " +
                    "group by t.word order by count desc"
        sqlContext.sql(sql).show()

        sc.stop()
    }
}

輸出結果如下:

+---------+
|     line|
+---------+
|Hello you|
| Hello he|
| Hello me|
+---------+

+-----+-----+
| word|count|
+-----+-----+
|Hello|    3|
|   me|    1|
|   he|    1|
|  you|    1|
+-----+-----+


原文鏈接:http://blog.51cto.com/xpleaf/2114584

Spark SQL筆記整理(三):加載保存功能與Spark SQL函數