大資料之Spark(五)--- Spark的SQL模組,Spark的JDBC實現,SparkSQL整合MySQL,SparkSQL整合Hive和Beeline
阿新 • • 發佈:2018-11-09
一、Spqrk的SQL模組 ---------------------------------------------------------- 1.該模組能在Spack上執行Sql語句 2.可以處理廣泛的資料來源 3.DataFrame --- RDD --- table //資料框 4.可以使用SQL語句和DataFrameAPI,訪問資料庫 二、Spark的JDBC實現 ------------------------------------------------------------- 1.Spark-Shell方式實現 //a.建立樣例類 scala> case class Customer(id:Int,name:String,age:Int) defined class Customer //b.構造資料 scala> val arr = Array("1,tom,12","2,tomas,13","3,tomasLee,14") arr: Array[String] = Array(1,tom,12, 2,tomas,13, 3,tomasLee,14) //c.建立RDD物件 scala> val rdd1 = sc.parallelize(arr) rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26 //d.建立 Customer RDD scala> val rdd2 = rdd1.map(e=>{val arr = e.split(","); Customer(arr(0).toInt,arr(1),arr(2).toInt)}) rdd2: org.apache.spark.rdd.RDD[Customer] = MapPartitionsRDD[24] at map at <console>:30 //e.通過rdd建立資料框 scala> val df = spark.createDataFrame(rdd2); df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] //f.打印表結構 scala> df.printSchema root |-- id: integer (nullable = true) |-- name: string (nullable = true) |-- age: integer (nullable = true) //g.查詢資料 scala> df.show +---+--------+---+ | id| name|age| +---+--------+---+ | 1| tom| 12| | 2| tomas| 13| | 3|tomasLee| 14| +---+--------+---+ //h.建立臨時檢視[sql中的表名tablename] scala> df.createTempView("customers") //i.執行sql語句 scala> val df2 = spark.sql("select * from customers") df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> df2.show +---+--------+---+ | id| name|age| +---+--------+---+ | 1| tom| 12| | 2| tomas| 13| | 3|tomasLee| 14| +---+--------+---+ scala> spark.sql("select * from customers").show +---+--------+---+ | id| name|age| +---+--------+---+ | 1| tom| 12| | 2| tomas| 13| | 3|tomasLee| 14| +---+--------+---+ //j.聯合查詢union scala> val df1 = spark.sql("select * from customers where id < 2"); df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> val df2 = spark.sql("select * from customers where id > 2"); df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> df1.createTempView("c1") scala> df2.createTempView("c2") scala> spark.sql("select * from c1 union select * from c2").show() +---+--------+---+ | id| name|age| +---+--------+---+ | 1| tom| 12| | 3|tomasLee| 14| +---+--------+---+ scala> df1.union(df2) res10: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 1 more field] scala> val df3 = df1.union(df2) df3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, name: string ... 1 more field] scala> df3.show +---+--------+---+ | id| name|age| +---+--------+---+ | 1| tom| 12| | 3|tomasLee| 14| +---+--------+---+ //k.其他查詢 scala> spark.sql("select id,name from customers").show +---+--------+ | id| name| +---+--------+ | 1| tom| | 2| tomas| | 3|tomasLee| +---+--------+ scala> df.selectExpr("id","name").show +---+--------+ | id| name| +---+--------+ | 1| tom| | 2| tomas| | 3|tomasLee| +---+--------+ scala> df.where("name like 't%'").show +---+--------+---+ | id| name|age| +---+--------+---+ | 1| tom| 12| | 2| tomas| 13| | 3|tomasLee| 14| +---+--------+---+ scala> df.where("name like '%e'").show +---+--------+---+ | id| name|age| +---+--------+---+ | 3|tomasLee| 14| +---+--------+---+ //l.聚合和對映 a.統計客戶age的總和,聚合操作DataSet[Int] scala> df.map(x=> x.getAs[Int]("age")).reduce( _ + _ ) res17: Int = 39 b.聚合函式 scala> df.agg(sum("age"),max("age"),min("age")).show +--------+--------+--------+ |sum(age)|max(age)|min(age)| +--------+--------+--------+ | 39| 14| 12| +--------+--------+--------+ 2.Java方式實現 a.建立模組,新增依賴 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.17</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> b.將DataFrame轉換成RDD的操作 JavaRDD<Row> rdd = df1.toJavaRDD(); c.儲存spark的sql計算結果(json) //儲存成json檔案。 df.write().json(dir) ; //設定儲存模式 df.mode(SaveMode.APPEND) ; d.程式碼演示
package com.test.spark.sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import java.util.function.Consumer; public class TestSparkSQL { public static void main(String [] args) { SparkSession session = SparkSession.builder() .appName("SQLJava") //設定master方式. .config("spark.master","local") .getOrCreate(); Dataset<Row> df = session.read().json("file:///D:\\share\\scala\\data\\json.dat"); //df.show(); df.createOrReplaceTempView("t1"); //Dataset<Row> df1 = session.sql("select * from t1 where id > 3"); //df.where("id > 3").show(); //session.sql("select count(id) from t1").show(); //System.out.println(df.count()); //df.select(df.col("id").plus(1),df.col("name").substr(1,2),df.col("name")).show(); //將DataFrame轉換成RDD的操作 JavaRDD<Row> jrdd = df.toJavaRDD(); jrdd.collect().forEach( new Consumer<Row>() { @Override public void accept(Row row) { //System.out.println(row.getLong(0) + ":" + row.getLong(1) + ":" + row.getString(2)); } } ); //設定儲存模式 df.write().mode(SaveMode.Append); //儲存spark的sql計算結果(json) df.write().json("d://share/spark/res"); } }
三、SparkSQL整合MySQL ------------------------------------------------------------ 1.spark-shell方式實現 scala> val jdbcDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.43.1:3306/mybatis", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "persons", "user" -> "mysql", "password" -> "mysql")).load() jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field] scala> jdbcDF.show() +---+----+-----------+ | id|name| phone| +---+----+-----------+ | 1| 唐華|13560191111| | 2| 趙嗄|18022222222| | 3| 朱寬|18332561111| | 4| 張噢|15032295555| | 5| 任陽|15614209999| | 6| 劉飛|15732641111| | 7| 樑鵬|15778421111| | 8| 楊謀|18301581111| | 9| 史讓|15811111111| | 10| 張類|17731086666| | 11| 郭彤|18641241111| | 12| 杜跑|15733218888| | 13| 張錒 |15133333333| | 14| 王以|13269364444| | 15| 劉宗|18620191111| | 16| 李平|15338597777| | 17| 段星|13341101111| | 18| 溫英|13520401111| +---+----+-----------+ scala> spark.sql("select * from customers where id == 1 ").show +---+----+-----------+ | id|name| phone| +---+----+-----------+ | 1| 唐華|13560191111| +---+----+-----------+ 2.java程式設計實現 a.引入mysql驅動 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.17</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> b.程式碼演示
package com.test.spark.sql;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.Properties;
import java.util.function.Consumer;
public class TestSparkJDBC {
public static void main(String [] args)
{
SparkSession session = SparkSession.builder()
.appName("SQLJava")
//設定master方式.
.config("spark.master","local")
.getOrCreate();
String url = "jdbc:mysql://192.168.43.1:3306/mybatis";
String table = "persons";
String user = "mysql";
String password = "mysql";
//讀取mysql資料庫中的表
Dataset<Row> df = session.read().format("jdbc")
.option("url", url)
.option("dbtable",table)
.option("user",user)
.option("password",password)
.option("driver","com.mysql.jdbc.Driver")
.load();
df.show();
Dataset<Row> df1 = df.where("phone like '135%'");
df1 = df1.distinct();
//向mysql資料庫中寫入新表
Properties prop = new Properties();
prop.put("user",user);
prop.put("password",password);
prop.put("driver","com.mysql.jdbc.Driver");
df1.write().jdbc(url,"subpersons",prop);
}
}
四、SparkSQL整合Hive
------------------------------------------------------
1.Spark-Shell上:
a.保證在spark worker節點上,存在hive的類庫
b.複製hive-site.xml,core-site.xml,hdfs-site.xml到spark/conf 下,並分發到所有節點
c.複製hive/lib/下的musql驅動jar檔案,到/soft/spark/jars下,並分發
d.修改spark/sbin/spark-config.sh 檔案,新增三個_HOME路徑
...
# included in all the spark scripts with source command
# should not be executable directly
# also should not be passed any arguments, since we need original $*
export JAVA_HOME=/soft/jdk
export HADOOP_HOME=/soft/hadoop/etc/hadoop
export HIVE_HOME=/soft/hive
# symlink and absolute path should rely on SPARK_HOME to resolve
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
...
e.啟動spark-shell,並指定啟動模式
spark-shell --master local
f.建立表
scala> spark.sql("create table mydb.tt(id int, name string , age int)
row format delimited fields terminated by ','
lines terminated by '\n'
stored as textfile"
)
g.載入資料到hive表中
scala> spark.sql("load data local inpath '/mnt/hgfs/share/tt.txt' into table mydb.tt")
scala> spark.sql("select * from mydb.tt").show
+---+----+---+
| id|name|age|
+---+----+---+
| 1|tom1| 11|
| 2|tom2| 12|
| 3|tom3| 13|
| 4|tom4| 14|
| 5|tom5| 15|
+---+----+---+
h.然後就可以使用Hsql語句就行操作hive資料庫了
scala> spark.sql("select ........")
2.SparkSQL操作hive的表--java版
a.建立java模組,新增meven依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.it18zhang</groupId>
<artifactId>SparkDemo1</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<recompileMode>incremental</recompileMode>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.17</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
</project>
b.複製配置檔案到java工程resources目錄下
core-site.xml
hdfs-site.xml
hive-site.xml
c.程式碼演示
package com.test.spark.sql;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class TestSparkHive {
public static void main(String [] args)
{
String warehouseLocation = "/spark-warehouse";
// init spark session with hive support
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.master("local[*]")
.config("spark.sql.warehouse.dir", warehouseLocation)
.getOrCreate();
String sqlStr1 = "create table te3(id int, name string , age int)";
//String sqlStr2 = "load data local inpath '/mnt/hgfs/share/tt.txt' into table te3";
//String sqlStr3 = "select * from te3";
Dataset<Row> df = spark.sql(sqlStr1);
df.show();
}
}
五、Beeline連線spark,遠端訪問hive
-----------------------------------------------------
1.啟動spark叢集,完全分散式(standalone)
2.建立hive表
$>hive -e "create table tt(id int,name string , age int) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile"
3.載入資料到hive表中.
$>hive -e "load data local inpath 'file:////mnt/hgfs/share/tt.txt' into table tt"
$>hive -e "select * from tt"
4.啟動spark-shell
$>spark-shell --master spark://s100:7077
scala> spark.sql("select * from tt").show();
+---+----+---+
| id|name|age|
+---+----+---+
| 1|tom1| 11|
| 2|tom2| 12|
| 3|tom3| 13|
| 4|tom4| 14|
| 5|tom5| 15|
+---+----+---+
5.啟動thriftserver伺服器
$> cd /soft/spark/sbin
$> start-thriftserver.sh --master spark://s100:7077
$> netstat -anop | grep 10000
6.啟動beeline客戶端
$> cd /soft/spark/bin
$> beeline -u jdbc:hive2://s100:10000
$beeline> select * from tt;
7.java程式設計實現
package com.test.spark.sql;
import com.sun.org.apache.bcel.internal.util.ClassLoader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
public class TestSparkBeeline {
public static void main(String [] args)
{
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection conn = DriverManager.getConnection("jdbc:hive2://s100:10000");
Statement st = conn.createStatement();
ResultSet set = st.executeQuery("select * from tt");
while (set.next()) {
int id = set.getInt(1);
String name = set.getString(2);
int age = set.getInt(3);
System.out.println(id + ":" + name + ":" + age);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}