1. 程式人生 > >大資料之Spark(五)--- Spark的SQL模組,Spark的JDBC實現,SparkSQL整合MySQL,SparkSQL整合Hive和Beeline

大資料之Spark(五)--- Spark的SQL模組,Spark的JDBC實現,SparkSQL整合MySQL,SparkSQL整合Hive和Beeline

一、Spqrk的SQL模組
----------------------------------------------------------
    1.該模組能在Spack上執行Sql語句

    2.可以處理廣泛的資料來源

    3.DataFrame --- RDD --- table   //資料框

    4.可以使用SQL語句和DataFrameAPI,訪問資料庫


二、Spark的JDBC實現
-------------------------------------------------------------
    1.Spark-Shell方式實現
        //a.建立樣例類
            scala> case class Customer(id:Int,name:String,age:Int)
            defined class Customer

        //b.構造資料
            scala> val arr = Array("1,tom,12","2,tomas,13","3,tomasLee,14")
            arr: Array[String] = Array(1,tom,12, 2,tomas,13, 3,tomasLee,14)

        //c.建立RDD物件
            scala> val rdd1 = sc.parallelize(arr)
            rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26

        //d.建立 Customer RDD
            scala> val rdd2 = rdd1.map(e=>{val arr = e.split(","); Customer(arr(0).toInt,arr(1),arr(2).toInt)})
            rdd2: org.apache.spark.rdd.RDD[Customer] = MapPartitionsRDD[24] at map at <console>:30

        //e.通過rdd建立資料框
            scala> val df = spark.createDataFrame(rdd2);
            df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

        //f.打印表結構
            scala> df.printSchema
            root
            |-- id: integer (nullable = true)
            |-- name: string (nullable = true)
            |-- age: integer (nullable = true)

        //g.查詢資料
            scala> df.show
            +---+--------+---+
            | id|    name|age|
            +---+--------+---+
            |  1|     tom| 12|
            |  2|   tomas| 13|
            |  3|tomasLee| 14|
            +---+--------+---+

        //h.建立臨時檢視[sql中的表名tablename]
            scala> df.createTempView("customers")

        //i.執行sql語句
            scala> val df2 = spark.sql("select * from customers")
            df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

            scala> df2.show
            +---+--------+---+
            | id|    name|age|
            +---+--------+---+
            |  1|     tom| 12|
            |  2|   tomas| 13|
            |  3|tomasLee| 14|
            +---+--------+---+

            scala> spark.sql("select * from customers").show
            +---+--------+---+
            | id|    name|age|
            +---+--------+---+
            |  1|     tom| 12|
            |  2|   tomas| 13|
            |  3|tomasLee| 14|
            +---+--------+---+


        //j.聯合查詢union
            scala> val df1 = spark.sql("select * from customers where id < 2");
            df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

            scala> val df2 = spark.sql("select * from customers where id > 2");
            df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

            scala> df1.createTempView("c1")

            scala> df2.createTempView("c2")

            scala> spark.sql("select * from c1 union select * from c2").show()
            +---+--------+---+
            | id|    name|age|
            +---+--------+---+
            |  1|     tom| 12|
            |  3|tomasLee| 14|
            +---+--------+---+


            scala> df1.union(df2)
            res10: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 1 more field]

            scala> val df3 = df1.union(df2)
            df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 1 more field]

            scala> df3.show
            +---+--------+---+
            | id|    name|age|
            +---+--------+---+
            |  1|     tom| 12|
            |  3|tomasLee| 14|
            +---+--------+---+

        //k.其他查詢
            scala> spark.sql("select id,name from customers").show
            +---+--------+
            | id|    name|
            +---+--------+
            |  1|     tom|
            |  2|   tomas|
            |  3|tomasLee|
            +---+--------+

            scala> df.selectExpr("id","name").show
            +---+--------+
            | id|    name|
            +---+--------+
            |  1|     tom|
            |  2|   tomas|
            |  3|tomasLee|
            +---+--------+

            scala> df.where("name like 't%'").show
            +---+--------+---+
            | id|    name|age|
            +---+--------+---+
            |  1|     tom| 12|
            |  2|   tomas| 13|
            |  3|tomasLee| 14|
            +---+--------+---+

            scala> df.where("name like '%e'").show
            +---+--------+---+
            | id|    name|age|
            +---+--------+---+
            |  3|tomasLee| 14|
            +---+--------+---+


        //l.聚合和對映
            a.統計客戶age的總和,聚合操作DataSet[Int]
                scala> df.map(x=> x.getAs[Int]("age")).reduce( _ + _ )
                res17: Int = 39

            b.聚合函式
                scala> df.agg(sum("age"),max("age"),min("age")).show
                +--------+--------+--------+
                |sum(age)|max(age)|min(age)|
                +--------+--------+--------+
                |      39|      14|      12|
                +--------+--------+--------+


    2.Java方式實現
        a.建立模組,新增依賴
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.11</artifactId>
                <version>2.1.0</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.17</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.1.0</version>
            </dependency>

        b.將DataFrame轉換成RDD的操作
            JavaRDD<Row> rdd = df1.toJavaRDD();

        c.儲存spark的sql計算結果(json)
            //儲存成json檔案。
            df.write().json(dir) ;

            //設定儲存模式
            df.mode(SaveMode.APPEND) ;

        d.程式碼演示
            
package com.test.spark.sql;

            import org.apache.spark.SparkConf;
            import org.apache.spark.api.java.JavaRDD;
            import org.apache.spark.api.java.JavaSparkContext;
            import org.apache.spark.sql.Dataset;
            import org.apache.spark.sql.Row;
            import org.apache.spark.sql.SaveMode;
            import org.apache.spark.sql.SparkSession;

            import java.util.function.Consumer;

            public class TestSparkSQL {

                public static void main(String [] args)
                {
                    SparkSession session = SparkSession.builder()
                            .appName("SQLJava")
                            //設定master方式.
                            .config("spark.master","local")
                            .getOrCreate();
                    Dataset<Row> df = session.read().json("file:///D:\\share\\scala\\data\\json.dat");
                    //df.show();
                    df.createOrReplaceTempView("t1");
                    //Dataset<Row> df1 = session.sql("select * from t1 where id > 3");
                    //df.where("id > 3").show();
                    //session.sql("select count(id) from t1").show();
                    //System.out.println(df.count());
                    //df.select(df.col("id").plus(1),df.col("name").substr(1,2),df.col("name")).show();

                    //將DataFrame轉換成RDD的操作
                    JavaRDD<Row> jrdd = df.toJavaRDD();
                    jrdd.collect().forEach(
                            new Consumer<Row>() {
                                @Override
                                public void accept(Row row) {
                                    //System.out.println(row.getLong(0) + ":" + row.getLong(1) + ":" + row.getString(2));
                                }
                            }
                    );

                    //設定儲存模式
                    df.write().mode(SaveMode.Append);
                    //儲存spark的sql計算結果(json)
                    df.write().json("d://share/spark/res");
                }
            }


三、SparkSQL整合MySQL
------------------------------------------------------------
    1.spark-shell方式實現

        scala> val jdbcDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.43.1:3306/mybatis", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "persons", "user" -> "mysql", "password" -> "mysql")).load()
        jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

        scala> jdbcDF.show()
        +---+----+-----------+
        | id|name|      phone|
        +---+----+-----------+
        |  1|  唐華|13560191111|
        |  2|  趙嗄|18022222222|
        |  3|  朱寬|18332561111|
        |  4|  張噢|15032295555|
        |  5|  任陽|15614209999|
        |  6|  劉飛|15732641111|
        |  7|  樑鵬|15778421111|
        |  8|  楊謀|18301581111|
        |  9|  史讓|15811111111|
        | 10|  張類|17731086666|
        | 11|  郭彤|18641241111|
        | 12|  杜跑|15733218888|
        | 13| 張錒 |15133333333|
        | 14|  王以|13269364444|
        | 15|  劉宗|18620191111|
        | 16|  李平|15338597777|
        | 17|  段星|13341101111|
        | 18|  溫英|13520401111|
        +---+----+-----------+

        scala> spark.sql("select * from customers where id == 1 ").show
        +---+----+-----------+
        | id|name|      phone|
        +---+----+-----------+
        |  1|  唐華|13560191111|
        +---+----+-----------+


    2.java程式設計實現
        a.引入mysql驅動
            <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-core_2.11</artifactId>
                    <version>2.1.0</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-mllib_2.11</artifactId>
                    <version>2.1.0</version>
                </dependency>
                <dependency>
                    <groupId>mysql</groupId>
                    <artifactId>mysql-connector-java</artifactId>
                    <version>5.1.17</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-sql_2.11</artifactId>
                    <version>2.1.0</version>
                </dependency>

        b.程式碼演示
             
 package com.test.spark.sql;

              import org.apache.spark.api.java.JavaRDD;
              import org.apache.spark.sql.Dataset;
              import org.apache.spark.sql.Row;
              import org.apache.spark.sql.SaveMode;
              import org.apache.spark.sql.SparkSession;

              import java.util.Properties;
              import java.util.function.Consumer;

              public class TestSparkJDBC {
                  public static void main(String [] args)
                  {
                      SparkSession session = SparkSession.builder()
                              .appName("SQLJava")
                              //設定master方式.
                              .config("spark.master","local")
                              .getOrCreate();
                      String url = "jdbc:mysql://192.168.43.1:3306/mybatis";
                      String table = "persons";
                      String user = "mysql";
                      String password = "mysql";

                      //讀取mysql資料庫中的表
                      Dataset<Row> df = session.read().format("jdbc")
                              .option("url", url)
                              .option("dbtable",table)
                              .option("user",user)
                              .option("password",password)
                              .option("driver","com.mysql.jdbc.Driver")
                              .load();
                      df.show();
                      Dataset<Row> df1 = df.where("phone like '135%'");
                      df1 = df1.distinct();

                      //向mysql資料庫中寫入新表
                      Properties prop = new Properties();
                      prop.put("user",user);
                      prop.put("password",password);
                      prop.put("driver","com.mysql.jdbc.Driver");
                      df1.write().jdbc(url,"subpersons",prop);
                  }
              }

四、SparkSQL整合Hive
------------------------------------------------------
    1.Spark-Shell上:
        a.保證在spark worker節點上,存在hive的類庫

        b.複製hive-site.xml,core-site.xml,hdfs-site.xml到spark/conf 下,並分發到所有節點

        c.複製hive/lib/下的musql驅動jar檔案,到/soft/spark/jars下,並分發

        d.修改spark/sbin/spark-config.sh 檔案,新增三個_HOME路徑
          ...
          # included in all the spark scripts with source command
          # should not be executable directly
          # also should not be passed any arguments, since we need original $*
          export JAVA_HOME=/soft/jdk
          export HADOOP_HOME=/soft/hadoop/etc/hadoop
          export HIVE_HOME=/soft/hive
          # symlink and absolute path should rely on SPARK_HOME to resolve
          if [ -z "${SPARK_HOME}" ]; then
            export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
          fi
          ...

        e.啟動spark-shell,並指定啟動模式
            spark-shell --master local

        f.建立表
            scala> spark.sql("create table mydb.tt(id int, name string , age int)
                row format delimited fields terminated by ','
                lines terminated by '\n'
                stored as textfile"
                )

        g.載入資料到hive表中
            scala> spark.sql("load data local inpath '/mnt/hgfs/share/tt.txt' into table mydb.tt")
            scala> spark.sql("select * from mydb.tt").show
            +---+----+---+
            | id|name|age|
            +---+----+---+
            |  1|tom1| 11|
            |  2|tom2| 12|
            |  3|tom3| 13|
            |  4|tom4| 14|
            |  5|tom5| 15|
            +---+----+---+

        h.然後就可以使用Hsql語句就行操作hive資料庫了
            scala> spark.sql("select ........")


    2.SparkSQL操作hive的表--java版
        a.建立java模組,新增meven依賴
            <?xml version="1.0" encoding="UTF-8"?>
                    <project xmlns="http://maven.apache.org/POM/4.0.0"
                             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
                        <modelVersion>4.0.0</modelVersion>
                        <groupId>com.it18zhang</groupId>
                        <artifactId>SparkDemo1</artifactId>
                        <version>1.0-SNAPSHOT</version>
                        <build>
                            <sourceDirectory>src/main/java</sourceDirectory>
                            <plugins>
                                <plugin>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-compiler-plugin</artifactId>
                                    <configuration>
                                        <source>1.8</source>
                                        <target>1.8</target>
                                    </configuration>
                                </plugin>
                                <plugin>
                                    <groupId>net.alchim31.maven</groupId>
                                    <artifactId>scala-maven-plugin</artifactId>
                                    <version>3.2.2</version>
                                    <configuration>
                                        <recompileMode>incremental</recompileMode>
                                    </configuration>
                                    <executions>
                                        <execution>
                                            <goals>
                                                <goal>compile</goal>
                                                <goal>testCompile</goal>
                                            </goals>
                                        </execution>
                                    </executions>
                                </plugin>
                            </plugins>
                        </build>
                        <dependencies>
                            <dependency>
                                <groupId>org.apache.spark</groupId>
                                <artifactId>spark-core_2.11</artifactId>
                                <version>2.1.0</version>
                            </dependency>
                            <dependency>
                                <groupId>org.apache.spark</groupId>
                                <artifactId>spark-mllib_2.11</artifactId>
                                <version>2.1.0</version>
                            </dependency>
                            <dependency>
                                <groupId>mysql</groupId>
                                <artifactId>mysql-connector-java</artifactId>
                                <version>5.1.17</version>
                            </dependency>
                            <dependency>
                                <groupId>org.apache.spark</groupId>
                                <artifactId>spark-sql_2.11</artifactId>
                                <version>2.1.0</version>
                            </dependency>
                        </dependencies>
                    </project>

        b.複製配置檔案到java工程resources目錄下
            core-site.xml
            hdfs-site.xml
            hive-site.xml

        c.程式碼演示
            
package com.test.spark.sql;

            import org.apache.spark.sql.Dataset;
            import org.apache.spark.sql.Encoders;
            import org.apache.spark.sql.Row;
            import org.apache.spark.sql.SparkSession;

            public class TestSparkHive {
                public static void main(String [] args)
                {
                    String warehouseLocation = "/spark-warehouse";
                    // init spark session with hive support
                    SparkSession spark = SparkSession
                            .builder()
                            .appName("Java Spark Hive Example")
                            .master("local[*]")
                            .config("spark.sql.warehouse.dir", warehouseLocation)
                            .getOrCreate();

                    String sqlStr1 = "create table te3(id int, name string , age int)";
                    //String sqlStr2 = "load data local inpath '/mnt/hgfs/share/tt.txt' into table te3";
                    //String sqlStr3 = "select * from te3";


                    Dataset<Row> df = spark.sql(sqlStr1);
                    df.show();

                }
            }

五、Beeline連線spark,遠端訪問hive
-----------------------------------------------------
    1.啟動spark叢集,完全分散式(standalone)

    2.建立hive表
       $>hive -e "create table tt(id int,name string , age int) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile"

    3.載入資料到hive表中.
        $>hive -e "load data local inpath 'file:////mnt/hgfs/share/tt.txt' into table tt"
        $>hive -e "select * from tt"

    4.啟動spark-shell
        $>spark-shell --master spark://s100:7077

        scala> spark.sql("select * from tt").show();
        +---+----+---+
        | id|name|age|
        +---+----+---+
        |  1|tom1| 11|
        |  2|tom2| 12|
        |  3|tom3| 13|
        |  4|tom4| 14|
        |  5|tom5| 15|
        +---+----+---+

    5.啟動thriftserver伺服器
        $> cd /soft/spark/sbin
        $> start-thriftserver.sh --master spark://s100:7077
        $> netstat -anop | grep 10000

    6.啟動beeline客戶端
        $> cd /soft/spark/bin
        $> beeline -u jdbc:hive2://s100:10000

        $beeline> select * from tt;

    7.java程式設計實現
       





 package com.test.spark.sql;

        import com.sun.org.apache.bcel.internal.util.ClassLoader;

        import java.sql.Connection;
        import java.sql.DriverManager;
        import java.sql.ResultSet;
        import java.sql.Statement;

        public class TestSparkBeeline {

            public static void main(String [] args)
            {
                try {
                    Class.forName("org.apache.hive.jdbc.HiveDriver");
                    Connection conn = DriverManager.getConnection("jdbc:hive2://s100:10000");
                    Statement st =  conn.createStatement();
                    ResultSet set = st.executeQuery("select * from tt");
                    while (set.next()) {
                        int id = set.getInt(1);
                        String name = set.getString(2);
                        int age = set.getInt(3);
                        System.out.println(id + ":" + name + ":" + age);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }