1. 程式人生 > >第68課:Spark SQL通過JDBC操作MySQL

第68課:Spark SQL通過JDBC操作MySQL

內容:

    1.SparkSQL操作關係資料庫意義     2.SparkSQL操作關係資料庫

一、通過SparkSQL操作關係資料庫意義

    1.SparkSQL可以通過jdbc從傳統關係型資料庫中讀寫資料,讀取資料後直接生成DataFrame,然後在加上藉助於Spark核心的豐富的API來進行各種操作;     2.關於JDBC的驅動jar可以使用在Spark的jars目錄中,也可以在使用spark-submit提交的時候引入,編碼和打包的時候不需要這個JDBC的jar     3.在實際的企業級開發環境中,如果資料庫中資料規模特別大,例如10億條資料此時如果用DB去處理的話,一般需要對資料進行多批次處理,例如分成100批(受限於單臺Server的處理能力,)且實際的處理可能會非常複雜,通過傳統的Java EE等技術很難或者不方便實現處理演算法,此時採用sparkSQL 獲得資料庫中的資料並進行分散式處理就可以非常好解決該問題,但是由於SparkSQL載入DB的資料需要時間,所以一般會SparkSQL和具體操作的DB之間加上一個緩衝層,例如中間使用redis,可以把SparkSQL處理速度提高到原來的45倍;     4.MYSQL 是單機版本的,SparkSQL訪問MYSQL就可以平行計算,SparkSQL中的DataFrame的計算能力比關係資料庫快很多,可以儘可能的滿足應用

二、SparkSQL操作關係資料庫

    //建立spark資料庫     create database spark;

         //建立userinfor表     create table userinfor(        id INT NOT NULL AUTO_INCREMENT,        name VARCHAR(100) NOT NULL,        age INT not null,        PRIMARY KEY (id)     );

        //向userinfor表中插入三條資料     insert into userinfor (name,age) values("Michael",20);     insert into userinfor (name,age) values("Andy",30);     insert into userinfor (name,age) values("Justin",19);

         

    //建立scoreinfor表     create table scoreinfor(        id INT NOT NULL AUTO_INCREMENT,        name VARCHAR(100) NOT NULL,        score INT not null,        PRIMARY KEY (id)     );

         //向scoreinfo表中插入三條資料     insert into scoreinfor (name,score) values("Michael",98);     insert into scoreinfor (name,score) values("Andy",95);     insert into scoreinfor (name,score) values("Justin",91);  

   

    //建立jion後的儲存表userscoreinfor表     create table userscoreinfor(        id INT NOT NULL AUTO_INCREMENT,        name VARCHAR(100) NOT NULL,        age INT not null,        score INT not null,        PRIMARY KEY (id)     );

  Java程式碼示例:

package SparkSQL;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 * FileName: SparkSQLJDBCToMySQL
 * Author:   hadoop
 * Email:    [email protected]
 * Date:     18-11-8 下午11:37
 * Description:
 */
public class SparkSQLJDBCToMySQL {
    public static void main(String[] args) {
        //建立SparkConf用於讀取系統資訊並設定運用程式的名稱
        SparkConf conf = new SparkConf().setAppName("SparkSQLJDBCToMySQL").setMaster("local");
        //建立JavaSparkContext物件例項作為整個Driver的核心基石
        JavaSparkContext sc = new JavaSparkContext(conf);
        //設定輸出log的等級,可以設定INFO,WARN,ERROR
        sc.setLogLevel("ERROR");
        //建立SQLContext上下文物件,用於SqL的分析
        SQLContext sqlContext = new SQLContext(sc);
        /**
         * 1.通過format("jdbc")的方式來說明SparkSQL操作的資料來源是JDBC,
         *  JDBC後端一般都是資料庫,例如去操作MYSQL.Oracle資料庫
         * 2.通過DataframeReader的option方法把要訪問的資料庫資訊傳遞進去,
         * url:代表資料庫的jdbc連結的地址和具體要連線的資料庫
         * datable:具體要連線使用的資料庫
         * 3.Driver部分是SparkSQL訪問資料庫的具體驅動的完整包名和類名
         * 4.關於JDBC的驅動jar可以使用在Spark的lib目錄中,也可以在使用
         * spark-submit提交的時候引入,編碼和打包的時候不需要這個JDBC的jar
         */
       DataFrameReader reader =  sqlContext.read().format("jdbc");//指定資料來源
       reader.option("url","jdbc:mysql://localhost:3306/spark");//指定連線的資料庫
       reader.option("dbtable","userinfor");//操作的表
       reader.option("driver","com.mysql.jdbc.Driver");//JDBC的驅動
       reader.option("user","root"); //使用者名稱
       reader.option("password","123456"); //使用者密碼

        /**
         * 在實際的企業級開發環境中,如果資料庫中資料規模特別大,例如10億條資料
         * 此時如果用DB去處理的話,一般需要對資料進行多批次處理,例如分成100批
         * (受限於單臺Server的處理能力,)且實際的處理可能會非常複雜,
         * 通過傳統的Java EE等技術很難或者不方便實現處理演算法,此時採用sparkSQL
         * 獲得資料庫中的資料並進行分散式處理就可以非常好解決該問題,
         * 但是由於SparkSQL載入DB的資料需要時間,所以一般會SparkSQL和具體操作的DB之
         * 間加上一個緩衝層,例如中間使用redis,可以把SparkSQL處理速度提高到原來的45倍;
         */
       Dataset userinforDataSourceDS = reader.load();//基於userinfor表建立Dataframe
        userinforDataSourceDS.show();

        reader.option("dbtable","scoreinfor");
        Dataset  scoreinforDataSourceDs = reader.load();//基於scoreinfor表建立Dataframe
        //將兩個表進行jion操作
        JavaPairRDD<String,Tuple2<Integer,Integer>> resultRDD = userinforDataSourceDS.javaRDD().mapToPair(new PairFunction<Row,String,Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<String,Integer>(row.getAs("name"),row.getAs("age"));
            }
        }).join(scoreinforDataSourceDs.javaRDD().mapToPair(new PairFunction<Row,String,Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<String,Integer>(row.getAs("name"),row.getAs("score"));
            }
        }));
        //呼叫RowFactory工廠方法生成記錄
        JavaRDD<Row> reusltRowRDD = resultRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {

            @Override
            public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception {
                return RowFactory.create(tuple._1,tuple._2._1,tuple._2._2);
            }
        });

        /**
         * 動態構造DataFrame的元資料,一般而言,有多少列以及每列的具體型別可能來自於json檔案,也可能來自於資料庫
         */
        List<StructField> structFields = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
        structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType,true));
        structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType,true));
        //構建StructType,用於最後DataFrame元資料的描述
        StructType structType = DataTypes.createStructType(structFields);
        //生成Dataset
        Dataset personDS = sqlContext.createDataFrame(reusltRowRDD,structType);
        personDS.show();
        /**
         * 1.當Dataframe要把通過SparkSQL,core、ml等複雜操作的資料寫入資料庫的時候首先是許可權的問題,必須確保資料庫授權了當前操作SparkSQL的使用者;
         * 2.Dataframe要寫資料到DB的時候,一般都不可以直接寫進去,而是要轉成RDD,通過RDD寫資料到DB中,
         */
       personDS.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>(){
            @Override
            public void call(Iterator<Row> rowIterator) throws Exception {
                Connection connection = null;//資料庫連線
                Statement statement = null; //

                try{
                    connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark","root","123456");
                    statement = connection.createStatement();
                    while (rowIterator.hasNext()){
                        String sqlText = "insert into userscoreinfor (name,age,score) values (";
                        Row row = rowIterator.next();
                        String name = row.getAs("name");
                        int age = row.getAs("age");
                        int score = row.getAs("score");
                        sqlText+="'"+name+"',"+"'"+age+"',"+"'"+score+"')";
                        statement.execute(sqlText);
                    }

                }catch (SQLException e){
                    e.printStackTrace();
                }finally {
                    if (connection != null){
                        connection.close();
                    }
                }

            }
        });

    }
}

Scala程式碼示例:

package SparkSQL

import java.sql.{Connection, Driver, DriverManager, SQLException, Statement}

import org.apache.spark.sql.{Row, RowFactory, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * FileName: SparkSQLJDBCMySQLScala
  * Author:   hadoop
  * Email:    [email protected]
  * Date:     18-11-9 上午9:27
  * Description:
  *
  */
object SparkSQLJDBCMySQLScala {
  def main(args: Array[String]): Unit = {
    //建立SparkConf用於讀取系統資訊並設定運用程式的名稱
    val conf = new SparkConf().setMaster("local").setAppName("SparkSQLJDBCMySQLScala")
    //建立JavaSparkContext物件例項作為整個Driver的核心基石
    val sc = new SparkContext(conf)
    //設定輸出log的等級,可以設定INFO,WARN,ERROR
    sc.setLogLevel("INFO")
    //建立SQLContext上下文物件,用於SqL的分析
    val sqlContext = new SQLContext(sc)
    /**
      * 1.通過format("jdbc")的方式來說明SparkSQL操作的資料來源是JDBC,
      * JDBC後端一般都是資料庫,例如去操作MYSQL.Oracle資料庫
      * 2.通過DataframeReader的option方法把要訪問的資料庫資訊傳遞進去,
      * url:代表資料庫的jdbc連結的地址和具體要連線的資料庫
      * datable:具體要連線使用的資料庫
      * 3.Driver部分是SparkSQL訪問資料庫的具體驅動的完整包名和類名
      * 4.關於JDBC的驅動jar可以使用在Spark的lib目錄中,也可以在使用
      * spark-submit提交的時候引入,編碼和打包的時候不需要這個JDBC的jar
      */
    val reader = sqlContext.read.format("jdbc")
    reader.option("url", "jdbc:mysql://localhost:3306/spark") //指定連線的資料庫
    reader.option("dbtable", "userinfor") //操作的表
    reader.option("driver", "com.mysql.jdbc.Driver") //JDBC的驅動
    reader.option("user", "root") //使用者名稱
    reader.option("password", "123456") //使用者密碼
    /**
      * 在實際的企業級開發環境中,如果資料庫中資料規模特別大,例如10億條資料
      * 此時如果用DB去處理的話,一般需要對資料進行多批次處理,例如分成100批
      * (受限於單臺Server的處理能力,)且實際的處理可能會非常複雜,
      * 通過傳統的Java EE等技術很難或者不方便實現處理演算法,此時採用sparkSQL
      * 獲得資料庫中的資料並進行分散式處理就可以非常好解決該問題,
      * 但是由於SparkSQL載入DB的資料需要時間,所以一般會SparkSQL和具體操作的DB之
      * 間加上一個緩衝層,例如中間使用redis,可以把SparkSQL處理速度提高到原來的45倍;
      */
    val userinforDataSourceDS = reader.load() //基於userinfor表建立Dataframe
    userinforDataSourceDS.show()
    reader.option("dbtable","scoreinfor")
    val scoreinforDataSourceDS = reader.load()//基於scoreinfor表建立Dataframe
    scoreinforDataSourceDS.show()
    //將兩個表進行jion操作
    val result = userinforDataSourceDS.rdd.map(row=>(row.getAs("name").toString,row.getInt(2))).join(scoreinforDataSourceDS.rdd.map(row=>(row.getAs("name").toString,row.getInt(2))))
    //將兩個表進行jion操作

    val resultRDD = result.map(row=>{
      val name = row._1.toString
      val age:java.lang.Integer = row._2._1
      val score:java.lang.Integer = row._2._2
      RowFactory.create(name,age,score)
    })
    /**
      * 1.當Dataframe要把通過SparkSQL,core、ml等複雜操作的資料寫入資料庫的時候首先是許可權的問題,必須確保資料庫授權了當前操作SparkSQL的使用者;
      * 2.Dataframe要寫資料到DB的時候,一般都不可以直接寫進去,而是要轉成RDD,通過RDD寫資料到DB中,
      */
    val userscoreinforDS = sqlContext.createDataFrame(resultRDD.map(row => PersonAgeScore(row.getString(0),row.getInt(1),row.getInt(2))))
    userscoreinforDS.show()
    userscoreinforDS.foreachPartition(row=>{
      var connection:Connection = null
      var states:Statement = null;
      try {
        connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark", "root", "123456")
        states = connection.createStatement()
        while (row.hasNext){
          var sqlText = "insert into  userscoreinfor (name,age,score) values ("
          val line = row.next
          val name = line.getAs("name").toString
          val age:java.lang.Integer = line.getAs("age")
          val score:java.lang.Integer = line.getAs("score")
          sqlText += "'" + name + "',"  + age + "," + score + ")"
          println(sqlText)
          states.execute(sqlText)
        }
      }catch  {
        case e: SQLException=>{
          e.printStackTrace()
        }

      }finally {
        if (connection != null)
          connection.close()
      }

    })
  }

}

執行結果:

 

注意:在在idea上執行程式碼時候,遇到問題

 1.Exception in thread "main" java.lang.ClassNotFoundException: com.mysql.jdbc

  解決:將mysql-connector-java-5.1.40-bin.jar引入工程中

 2.msyql Caused by: java.net.ConnectException: 拒絕連線 (Connection refused)

解決:資料庫配置是localhost,連線應該為jdbc:mysql://localhost:3306/spark