1. 程式人生 > >配置sparksql讀hive,dataframe和RDD,將RDD轉換成Dataframe,檢視,withcolumn

配置sparksql讀hive,dataframe和RDD,將RDD轉換成Dataframe,檢視,withcolumn

文章目錄

退出spark-shell

 :quit

使用spark自帶檔案建立dataframe

 自帶檔案在這下面
[[email protected] resources]$ pwd
/home/hadoop/app/spark-2.4.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources
scala> val df = spark.read.json("file:///home/hadoop/app/spark-2.4.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

scala> df.select("name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

//在Idea裡需要隱式轉換,import xxx.implicits._
scala> df.select($"name").show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+


scala> df.select('name).show
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

//注意===
scala> df.filter($"age" === 19).show
+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+

scala> df.filter("age = 19").show
+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+

退出安全模式

[[email protected] bin]$ hadoop dfsadmin -safemode leave

配置spark讀hive

1.pom檔案增加

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
<dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.27</version>
    </dependency>

2.resource下加檔案

在這裡插入圖片描述

3.修改hive-site檔案

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<property>
    #用於儲存不同map/reduce階段的執行計劃和這些階段的中間輸出結果
    <name>hive.exec.scratchdir</name>
    <value>hdfs://192.168.137.190:9000/hive/tmp</value>
    <description>Scratch space for Hive jobs</description>
</property>
<property>
    #hive資料儲存在hdfs上的目錄
    <name>hive.metastore.warehouse.dir</name>
    <value>hdfs://192.168.137.190:9000/user/hive/warehouse</value>
    <description>location of default database for the warehous</description>
</property>
<property>
    #Hive實時查詢日誌所在的目錄,如果該值為空,將不建立實時的查詢日誌
    <name>hive.querylog.location</name>
    <value>hdfs://192.168.137.190:9000/hive/log</value>
    <description>Location of Hive run time structured log file</description>
</property>

<property>

    <name>hive.metastore.local</name>
    <value>true</value>
    <description>controls whether to connect to remove metastore server or open a new metastore server in Hive Client JVM</description>
</property>

<property>
    #監聽的TCP埠號, 預設為 10000
    <name>hive.server2.thrift.port</name>
    <value>10000</value>
    <description>Port number of HiveServer2 Thrift interface.Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>
</property>

<property>
    #元資料schema驗證
    <name>hive.metastore.schema.verification</name>
    <value>false</value>
    <description>
        Enforce metastore schema version consistency.
        True: Verify that version information stored in metastore matches with one from Hive jars.  Also disable automatic schema migration attempt. Users are required to manually migrate schema after Hive upgrade which ensures proper metastore schema migration. (Default)
        False: Warn if the version information stored in metastore doesn't match with one from in Hive jars.
    </description>
</property>

<property>
    #元資料地址
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://192.168.137.190:3306/ruoze_d5?createDatabaseIfNotExist=true</value>
    <description>The default connection string for the database that stores temporary hive statistics.</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
    <description>Driver class name for a JDBC metastore</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>root</value>
    <description>username to use against metastore database</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>123456</value>
    <description>password to use against metastore database</description>
</property>

</configuration>

4.檢查mysql許可權

查是否有對應賬戶ip的對應databases的許可權
在這裡插入圖片描述

小試牛刀

import org.apache.spark.sql.SparkSession

object SparksessionApp {
  //建立dataframe
  def main(args: Array[String]): Unit = {
    val sessionApp = SparkSession.builder().appName("sparksession2")
      .master("local[2]")
      //開啟hive支援
      .enableHiveSupport()
      .getOrCreate()

    //sessionApp.sparkContext.parallelize(Array(1,2,3)).collect().foreach(println)

   // sessionApp.read.json("").show()

     //讀hive
    //sessionApp.sql("select * from page_views limit 10").show()
    //也可以這樣讀hive,解析成dataframe
    //sessionApp.table("page_views limit 10").show()

    //將rdd轉換為dataframe
    //得匯入隱式轉換
    import sessionApp.implicits._
    val g5 = sessionApp.sparkContext.
      parallelize(Array(("1", "豆豆"), ("2", "牛哥"), ("34", "進取")))
        .toDF("id","name")
    g5.show()
    g5.printSchema()

//    +---+----+
//    | id|name|
//    +---+----+
//    |  1|豆豆|
//      |  2|牛哥|
//      | 34|進取|
//      +---+----+
//
//    root
//    |-- id: string (nullable = true)
//    |-- name: string (nullable = true)

    sessionApp.stop()
  }
}

在hive裡查詢某表對應在hdfs的路徑

hive (default)> show create table ruoze_emp;

DataFrame vs RDD

1.從壓縮角度來看,DataFrame因為資料有元資料資訊,可以選擇更合適的壓縮方式。
2.如果對兩者select id from xxx,那麼RDD要在所有的partition裡去搜索,而DataFrame可以直接定位。
3.RDD使用不同語言開發,效能會有很大差異scala和java。而DataFrame可以把不同語言使用它的程式轉換為一個邏輯執行計劃然後轉換為物理執行計劃。注意,轉換為邏輯執行計劃的時候,會有個Catalyst的優化,使得不同語言效率相當。
在這裡插入圖片描述

show函式

預設顯示20行,第二個引數是是否所有字串20位之後顯示。預設true不顯示。

sparkSQL支援兩種方式將RDD轉換為Datasets

第一種是反射的方式去推測RDD的schema的具體型別。當schema資訊可以確定的時候可以使用(需要定義caseclass)。

第二種是程式設計的方式去建立Datasets。當不能事先用caseclass定義schema資訊時使用()
1.從原始rdd建立一個row型別rdd
2.建立一個schema由StructType代表匹配上一步的rdd
3.把schema作用在rdd上

alt+enter 將所需類匯入

試驗兩種方式轉換

import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._

object DFApp {

  def main(args: Array[String]): Unit = {
    val session=SparkSession.builder().
      appName("DF").
      master("local[2]")
      .getOrCreate()


    //inferReflection(session)
    programmatically(session)

    session.stop()
  }

  def programmatically(session: SparkSession)={

    val info = session.sparkContext.textFile("file:///F:/asd.txt")
    import session.implicits._
    //1.從原始rdd建立一個row型別rdd
    val rdd = info.map(_.split(",")).map(x=>Row(x(0),x(1),x(2).toLong))
    // 2.建立一個schema由StructType代表匹配上一步的rdd
    val struct = StructType(Array(
      //第三個引數是是否能為空
      StructField("ip",StringType,false),
      StructField("domain",StringType,false),
      StructField("response",LongType,false)
    ))
    //3.把schema作用在rdd上
    val df = session.createDataFrame(rdd,struct)
    //df.show()


    //df.select("ip").show(2)
    //sparksql的函式,需要匯入functions  import org.apache.spark.sql.functions._
    //agg使用聚合函式
    //df.groupBy('domain).agg(sum("response") as ("rs") ).select("domain","rs").show()

    //自定義函式
    //檢視,零時表
    //df.createOrReplaceTempView("infos")
    //註冊函式
    //函式名,對引數作用自定義功能,這裡的ip是裡面getInfo函式的傳參
    session.udf.register("province" ,(ip:String)=>{
      IPUtils.getInfo(ip)
    })

    //增加一列。
    val asd = df.withColumn("isp",col("response")*3)
    asd.createOrReplaceTempView("infos2")
    session.sql("select ip,domain,sum(response) as responseSize,province(ip) as province ,sum(isp) from infos2 group by ip,domain,province(ip) limit 5").show()



  }


  def inferReflection(session: SparkSession)={
    import session.implicits._
    //建立RDD
    val info = session.sparkContext.textFile("file:///F:/asd.txt")

    //info.take(3).foreach(println)
    //作用caseclass,這樣相當於給每個引數有對應的型別和名稱
    //scala對映支援自動轉換一個包含case class 的RDD成為一個DataFrame
    val df = info.map(_.split(",")).map(x=>Info(x(0),x(1),x(2).toLong)).toDF()
    //df.groupBy("ip","domain").sum("liu").show()

    //註冊一個檢視,相當於零時表,這樣就可以用sql去查詢
    df.createOrReplaceTempView("info")
    val infoDF=session.sql("select domain,sum(liu) from info group by domain")
    //dataframe要作用一個RDD的運算元需要加rdd
    //getAs是拿到domain的所有值
    //infoDF.rdd.map(x=>x.getAs[String]("domain")).collect().foreach(println)
    //不用轉換為rdd也可以
    infoDF.map(x=>x.getAs[String]("domain")).show()
    //索引方式也可以
    //infoDF.rdd.map(x=>x(0)).collect().foreach(println)



  }

}
//這個case class定義的就是表的schema
case class Info(ip:String,domain:String,liu:Long)

object IPUtils{
  def getInfo(ip:String) = {
    ("深圳市","聯通")
  }
}

直播資料統計

import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession, types}
import org.apache.spark.sql.functions._

object zhiboliuliang_task {
  def main(args: Array[String]): Unit = {
    val session  = SparkSession.builder()
      .master("local[2]")
      .appName("zhibo")
      .getOrCreate()

    //val file = session.read.format("text").load("file:///F:\\ruozedata_workplace\\g5-spark\\zhibo_liuliang.txt")
    //val file = session.read.text("file:///F:\\ruozedata_workplace\\g5-spark\\zhibo_liuliang.txt")
    val file = session.sparkContext.textFile("file:///F:\\\\ruozedata_workplace\\\\g5-spark\\\\zhibo_liuliang.txt")

//    file.rdd.map(line=>{
//      line(0)
//    }).collect().foreach(println)

//    //file.flatMap(_.split(",")).collect().foreach(println)
//    import session.implicits._
//
    val df = file.map(x=>{
      val temp = x.split(",")
      //去頭尾空格,擷取一段字串,從哪個位置開始找到哪個元素所在索引
      (temp(0),temp(1).trim.substring(0,temp(1).trim.indexOf("-",7)),temp(2).toLong)
      //temp
    }).map(x=>Row(x._1,x._2,x._3))
////      .toDF("id","date","liu")
//
    val struct = StructType(Array(
      StructField("id",StringType,false),
      StructField("date",StringType,false),
      StructField("liu",LongType,true)
    ))

    val imp = session.createDataFrame(df,struct)
    imp.show()

//      val asd = df.groupBy('date,'id).agg(sum('liu) as "month").select('id,'date,'month)
//
//      asd.createOrReplaceTempView("biao")
//
//      val tmp = session.sql("select id,date,month,sum(month) as all from biao group by id,date,month  ").show()

    session.stop()
  }
}