第68課:Spark SQL通過JDBC操作MySQL
內容:
1.SparkSQL操作關係資料庫意義 2.SparkSQL操作關係資料庫
一、通過SparkSQL操作關係資料庫意義
1.SparkSQL可以通過jdbc從傳統關係型資料庫中讀寫資料,讀取資料後直接生成DataFrame,然後在加上藉助於Spark核心的豐富的API來進行各種操作; 2.關於JDBC的驅動jar可以使用在Spark的jars目錄中,也可以在使用spark-submit提交的時候引入,編碼和打包的時候不需要這個JDBC的jar 3.在實際的企業級開發環境中,如果資料庫中資料規模特別大,例如10億條資料此時如果用DB去處理的話,一般需要對資料進行多批次處理,例如分成100批(受限於單臺Server的處理能力,)且實際的處理可能會非常複雜,通過傳統的Java EE等技術很難或者不方便實現處理演算法,此時採用sparkSQL 獲得資料庫中的資料並進行分散式處理就可以非常好解決該問題,但是由於SparkSQL載入DB的資料需要時間,所以一般會SparkSQL和具體操作的DB之間加上一個緩衝層,例如中間使用redis,可以把SparkSQL處理速度提高到原來的45倍; 4.MYSQL 是單機版本的,SparkSQL訪問MYSQL就可以平行計算,SparkSQL中的DataFrame的計算能力比關係資料庫快很多,可以儘可能的滿足應用
二、SparkSQL操作關係資料庫
//建立spark資料庫 create database spark;
//建立userinfor表 create table userinfor( id INT NOT NULL AUTO_INCREMENT, name VARCHAR(100) NOT NULL, age INT not null, PRIMARY KEY (id) );
//向userinfor表中插入三條資料 insert into userinfor (name,age) values("Michael",20); insert into userinfor (name,age) values("Andy",30); insert into userinfor (name,age) values("Justin",19);
//建立scoreinfor表 create table scoreinfor( id INT NOT NULL AUTO_INCREMENT, name VARCHAR(100) NOT NULL, score INT not null, PRIMARY KEY (id) );
//向scoreinfo表中插入三條資料 insert into scoreinfor (name,score) values("Michael",98); insert into scoreinfor (name,score) values("Andy",95); insert into scoreinfor (name,score) values("Justin",91);
//建立jion後的儲存表userscoreinfor表 create table userscoreinfor( id INT NOT NULL AUTO_INCREMENT, name VARCHAR(100) NOT NULL, age INT not null, score INT not null, PRIMARY KEY (id) );
Java程式碼示例:
package SparkSQL;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* FileName: SparkSQLJDBCToMySQL
* Author: hadoop
* Email: [email protected]
* Date: 18-11-8 下午11:37
* Description:
*/
public class SparkSQLJDBCToMySQL {
public static void main(String[] args) {
//建立SparkConf用於讀取系統資訊並設定運用程式的名稱
SparkConf conf = new SparkConf().setAppName("SparkSQLJDBCToMySQL").setMaster("local");
//建立JavaSparkContext物件例項作為整個Driver的核心基石
JavaSparkContext sc = new JavaSparkContext(conf);
//設定輸出log的等級,可以設定INFO,WARN,ERROR
sc.setLogLevel("ERROR");
//建立SQLContext上下文物件,用於SqL的分析
SQLContext sqlContext = new SQLContext(sc);
/**
* 1.通過format("jdbc")的方式來說明SparkSQL操作的資料來源是JDBC,
* JDBC後端一般都是資料庫,例如去操作MYSQL.Oracle資料庫
* 2.通過DataframeReader的option方法把要訪問的資料庫資訊傳遞進去,
* url:代表資料庫的jdbc連結的地址和具體要連線的資料庫
* datable:具體要連線使用的資料庫
* 3.Driver部分是SparkSQL訪問資料庫的具體驅動的完整包名和類名
* 4.關於JDBC的驅動jar可以使用在Spark的lib目錄中,也可以在使用
* spark-submit提交的時候引入,編碼和打包的時候不需要這個JDBC的jar
*/
DataFrameReader reader = sqlContext.read().format("jdbc");//指定資料來源
reader.option("url","jdbc:mysql://localhost:3306/spark");//指定連線的資料庫
reader.option("dbtable","userinfor");//操作的表
reader.option("driver","com.mysql.jdbc.Driver");//JDBC的驅動
reader.option("user","root"); //使用者名稱
reader.option("password","123456"); //使用者密碼
/**
* 在實際的企業級開發環境中,如果資料庫中資料規模特別大,例如10億條資料
* 此時如果用DB去處理的話,一般需要對資料進行多批次處理,例如分成100批
* (受限於單臺Server的處理能力,)且實際的處理可能會非常複雜,
* 通過傳統的Java EE等技術很難或者不方便實現處理演算法,此時採用sparkSQL
* 獲得資料庫中的資料並進行分散式處理就可以非常好解決該問題,
* 但是由於SparkSQL載入DB的資料需要時間,所以一般會SparkSQL和具體操作的DB之
* 間加上一個緩衝層,例如中間使用redis,可以把SparkSQL處理速度提高到原來的45倍;
*/
Dataset userinforDataSourceDS = reader.load();//基於userinfor表建立Dataframe
userinforDataSourceDS.show();
reader.option("dbtable","scoreinfor");
Dataset scoreinforDataSourceDs = reader.load();//基於scoreinfor表建立Dataframe
//將兩個表進行jion操作
JavaPairRDD<String,Tuple2<Integer,Integer>> resultRDD = userinforDataSourceDS.javaRDD().mapToPair(new PairFunction<Row,String,Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String,Integer>(row.getAs("name"),row.getAs("age"));
}
}).join(scoreinforDataSourceDs.javaRDD().mapToPair(new PairFunction<Row,String,Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(Row row) throws Exception {
return new Tuple2<String,Integer>(row.getAs("name"),row.getAs("score"));
}
}));
//呼叫RowFactory工廠方法生成記錄
JavaRDD<Row> reusltRowRDD = resultRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
@Override
public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception {
return RowFactory.create(tuple._1,tuple._2._1,tuple._2._2);
}
});
/**
* 動態構造DataFrame的元資料,一般而言,有多少列以及每列的具體型別可能來自於json檔案,也可能來自於資料庫
*/
List<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType,true));
structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType,true));
//構建StructType,用於最後DataFrame元資料的描述
StructType structType = DataTypes.createStructType(structFields);
//生成Dataset
Dataset personDS = sqlContext.createDataFrame(reusltRowRDD,structType);
personDS.show();
/**
* 1.當Dataframe要把通過SparkSQL,core、ml等複雜操作的資料寫入資料庫的時候首先是許可權的問題,必須確保資料庫授權了當前操作SparkSQL的使用者;
* 2.Dataframe要寫資料到DB的時候,一般都不可以直接寫進去,而是要轉成RDD,通過RDD寫資料到DB中,
*/
personDS.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>(){
@Override
public void call(Iterator<Row> rowIterator) throws Exception {
Connection connection = null;//資料庫連線
Statement statement = null; //
try{
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark","root","123456");
statement = connection.createStatement();
while (rowIterator.hasNext()){
String sqlText = "insert into userscoreinfor (name,age,score) values (";
Row row = rowIterator.next();
String name = row.getAs("name");
int age = row.getAs("age");
int score = row.getAs("score");
sqlText+="'"+name+"',"+"'"+age+"',"+"'"+score+"')";
statement.execute(sqlText);
}
}catch (SQLException e){
e.printStackTrace();
}finally {
if (connection != null){
connection.close();
}
}
}
});
}
}
Scala程式碼示例:
package SparkSQL
import java.sql.{Connection, Driver, DriverManager, SQLException, Statement}
import org.apache.spark.sql.{Row, RowFactory, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* FileName: SparkSQLJDBCMySQLScala
* Author: hadoop
* Email: [email protected]
* Date: 18-11-9 上午9:27
* Description:
*
*/
object SparkSQLJDBCMySQLScala {
def main(args: Array[String]): Unit = {
//建立SparkConf用於讀取系統資訊並設定運用程式的名稱
val conf = new SparkConf().setMaster("local").setAppName("SparkSQLJDBCMySQLScala")
//建立JavaSparkContext物件例項作為整個Driver的核心基石
val sc = new SparkContext(conf)
//設定輸出log的等級,可以設定INFO,WARN,ERROR
sc.setLogLevel("INFO")
//建立SQLContext上下文物件,用於SqL的分析
val sqlContext = new SQLContext(sc)
/**
* 1.通過format("jdbc")的方式來說明SparkSQL操作的資料來源是JDBC,
* JDBC後端一般都是資料庫,例如去操作MYSQL.Oracle資料庫
* 2.通過DataframeReader的option方法把要訪問的資料庫資訊傳遞進去,
* url:代表資料庫的jdbc連結的地址和具體要連線的資料庫
* datable:具體要連線使用的資料庫
* 3.Driver部分是SparkSQL訪問資料庫的具體驅動的完整包名和類名
* 4.關於JDBC的驅動jar可以使用在Spark的lib目錄中,也可以在使用
* spark-submit提交的時候引入,編碼和打包的時候不需要這個JDBC的jar
*/
val reader = sqlContext.read.format("jdbc")
reader.option("url", "jdbc:mysql://localhost:3306/spark") //指定連線的資料庫
reader.option("dbtable", "userinfor") //操作的表
reader.option("driver", "com.mysql.jdbc.Driver") //JDBC的驅動
reader.option("user", "root") //使用者名稱
reader.option("password", "123456") //使用者密碼
/**
* 在實際的企業級開發環境中,如果資料庫中資料規模特別大,例如10億條資料
* 此時如果用DB去處理的話,一般需要對資料進行多批次處理,例如分成100批
* (受限於單臺Server的處理能力,)且實際的處理可能會非常複雜,
* 通過傳統的Java EE等技術很難或者不方便實現處理演算法,此時採用sparkSQL
* 獲得資料庫中的資料並進行分散式處理就可以非常好解決該問題,
* 但是由於SparkSQL載入DB的資料需要時間,所以一般會SparkSQL和具體操作的DB之
* 間加上一個緩衝層,例如中間使用redis,可以把SparkSQL處理速度提高到原來的45倍;
*/
val userinforDataSourceDS = reader.load() //基於userinfor表建立Dataframe
userinforDataSourceDS.show()
reader.option("dbtable","scoreinfor")
val scoreinforDataSourceDS = reader.load()//基於scoreinfor表建立Dataframe
scoreinforDataSourceDS.show()
//將兩個表進行jion操作
val result = userinforDataSourceDS.rdd.map(row=>(row.getAs("name").toString,row.getInt(2))).join(scoreinforDataSourceDS.rdd.map(row=>(row.getAs("name").toString,row.getInt(2))))
//將兩個表進行jion操作
val resultRDD = result.map(row=>{
val name = row._1.toString
val age:java.lang.Integer = row._2._1
val score:java.lang.Integer = row._2._2
RowFactory.create(name,age,score)
})
/**
* 1.當Dataframe要把通過SparkSQL,core、ml等複雜操作的資料寫入資料庫的時候首先是許可權的問題,必須確保資料庫授權了當前操作SparkSQL的使用者;
* 2.Dataframe要寫資料到DB的時候,一般都不可以直接寫進去,而是要轉成RDD,通過RDD寫資料到DB中,
*/
val userscoreinforDS = sqlContext.createDataFrame(resultRDD.map(row => PersonAgeScore(row.getString(0),row.getInt(1),row.getInt(2))))
userscoreinforDS.show()
userscoreinforDS.foreachPartition(row=>{
var connection:Connection = null
var states:Statement = null;
try {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "123456")
states = connection.createStatement()
while (row.hasNext){
var sqlText = "insert into userscoreinfor (name,age,score) values ("
val line = row.next
val name = line.getAs("name").toString
val age:java.lang.Integer = line.getAs("age")
val score:java.lang.Integer = line.getAs("score")
sqlText += "'" + name + "'," + age + "," + score + ")"
println(sqlText)
states.execute(sqlText)
}
}catch {
case e: SQLException=>{
e.printStackTrace()
}
}finally {
if (connection != null)
connection.close()
}
})
}
}
執行結果:
注意:在在idea上執行程式碼時候,遇到問題
1.Exception in thread "main" java.lang.ClassNotFoundException: com.mysql.jdbc
解決:將mysql-connector-java-5.1.40-bin.jar引入工程中
2.msyql Caused by: java.net.ConnectException: 拒絕連線 (Connection refused)
解決:資料庫配置是localhost,連線應該為jdbc:mysql://localhost:3306/spark