1. 程式人生 > >[2.6]Spark SQL 操作各種資料來源筆記

[2.6]Spark SQL 操作各種資料來源筆記

參考

spark sql操作各種資料來源的資料流轉 :

各種資料來源的輸入 => RDD(lines) =>RDD(Rows) => DataFrame(註冊臨時表) => 分析與過濾(各種sql操作、機器學習等)=> RDD(Row) => 各種格式的輸出

場景

Spark sql怎麼操作各種資料來源: json格式的檔案、hive、mysql、hbase等

分析

  • 以spark sql內建函式的agg的操作為例,解讀 sql 資料流轉

程式碼

package main.scala
import org.apache.spark.SparkConf
import
org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.IntegerType import
org.apache.spark.sql.functions._ /** * 使用spark sql 內建函式對資料進行分析 * 內建函式返回一個列Column * 分類: * 1、聚合函式 2、集合函式 3、日期、時間函式 4、數學函式 5、開窗函式 6、字串處理函式 7、其他 */ object sqlagg { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("sparkinnerfunctions") val
sc = new SparkContext(conf) sc.setLogLevel("ERROR") val sqlContext = new SQLContext(sc) /* * 1. 獲取資料來源 - RDD(line) */ val userData = Array( "2016-5-29,001,http://spark.apache.org/,1000", "2016-5-20,001,http://spark.apache.org/,1090", "2016-5-20,001,http://spark.apache.org/,1060", "2016-5-30,002,http://spark.apache.org/,1000", "2016-5-30,003,http://spark.apache.org/,1000", "2016-5-10,003,http://spark.apache.org/,1020", "2016-5-10,003,http://spark.apache.org/,1020", "2016-5-10,003,http://spark.apache.org/,1000" ) var dataRDD = sc.parallelize(userData) /* * 2. 轉換成 RDD(Row) */ val rowRDD = dataRDD.map(line => { val splited = line.split(",") Row(splited(0),splited(1),splited(2),splited(3)) }) /* * 3. 指定Row的資料結構 並生成 DataFrame */ val structTypes = StructType( Array( StructField("time",StringType,true), StructField("userid",StringType,true), StructField("url",StringType,true), StructField("amount",StringType,true) ) ) val userDataDF = sqlContext.createDataFrame(rowRDD, structTypes) /* * 4 . 使用Spark SQL提供的內建函式對DataFrame進行操作(需要匯入相關隱式轉換資訊) * :內建函式生成column物件 */ import sqlContext.implicits._ //按日期分類,然後進行聚合操作: 去重 userid, 計算每天的銷售總量 userDataDF.groupBy("time").agg('time, countDistinct('userid)).map(row=>Row(row(1),row(2))).collect().foreach(println) userDataDF.groupBy("time").agg('time, sum('amount)).show() } }

執行結果

[2016-5-10,1]
[2016-5-20,1]
[2016-5-29,1]
[2016-5-30,2]

+---------+---------+-----------+
|     time|     time|sum(amount)|
+---------+---------+-----------+
|2016-5-10|2016-5-10|     3040.0|
|2016-5-20|2016-5-20|     2150.0|
|2016-5-29|2016-5-29|     1000.0|
|2016-5-30|2016-5-30|     2000.0|
+---------+---------+-----------+
  • 一. json

程式碼

package main.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
/**
 - spark sql操作 本地json檔案
 */
object DataFrameOps {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("DataFram Ops")
    val sqlContext = new SQLContext(new SparkContext(conf))

    val df = sqlContext.read.json("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")

    df.show
    df.printSchema
    df.select("name").show
    df.select("name", "age").show
    df.select(df("name"),df("age")+10).show
    df.filter(df("age")>10).show
  }
}

執行結果

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+----+
|name|
+----+
|Andy|
+----+

+----+---+
|name|age|
+----+---+
|Andy| 30|
+----+---+

+----+----------+
|name|(age + 10)|
+----+----------+
|Andy|        40|
+----+----------+

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
  • 二. Hive

    1. 程式碼
package cool.pengych.spark.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;
/**
 * SparkSQL OPS  on Hive
 * @author pengyucheng
 *
 */
public class SparkSQL2Hive
{
    public static void main(String[] args) 
    {
        SparkConf config = new SparkConf().setAppName("SparkSQL2Hive");
        SparkContext sc = new SparkContext(config);


        HiveContext hiveContext = new HiveContext(sc);
        hiveContext.sql("use hive");
        hiveContext.sql("DROP TABLE IF EXISTS people");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS people(name STRING,age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'");

        /*
         * (把本地資料載入到Hive資料倉庫中(背後實際上發生了資料的拷貝),
         * 當然也可以通過LOAD DATA INPATH去獲取HDFS等上的資料到Hive(此時發生了資料的移動)
         */
        hiveContext.sql("LOAD DATA LOCAL INPATH '/home/pengyucheng/resource/people.txt' INTO TABLE people ");

        hiveContext.sql("DROP TABLE IF EXISTS peoplescores");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS peoplescores(name STRING,score INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'");
        hiveContext.sql("LOAD DATA LOCAL INPATH '/home/pengyucheng/resource/peoplescores.txt' INTO TABLE peoplescores");

        /*
         * 通過HiveContext使用join直接基於Hive中的兩張表進行操作
         */
        DataFrame resultDF = hiveContext.sql("SELECT pi.name,pi.age,ps.score  FROM people pi JOIN peoplescores ps ON pi.name = ps.name WHERE ps.score > 75");

        /*
         * 通過saveAsTable建立一張 Hive Managerd Table,資料的元資料和資料的具體位置都是由Hive資料倉庫
         * 進行管理的,當刪除該表的時候,資料也會一起被刪除(磁碟上的資料不再存在)
         */
        hiveContext.sql("DROP TABLE IF EXISTS peopleinformationresult");
        resultDF.saveAsTable("peopleinformationresult");

        /*
         * 使用HiveContext的table方法可以直接讀取Hive資料倉庫中的Table並生成DataFrame,接下來就可以進行機器學習、圖計算
         * 等各種複雜ETL操作
         */
        DataFrame dataFromHive = hiveContext.table("peopleinformationresult");
        dataFromHive.show();
    }
}
  • SparkSQL JDBC 2 Mysql
package cool.pengych.spark.sql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;
/**
 * SparkSQL 通過 JDBC 操作 MySQL資料庫
 * @author pengyucheng
 *
 */
public class SparkSQLJDBC2MySQL
{
    public static void main(String[] args)
    {
        SparkConf config = new SparkConf().setMaster("local[*]").setAppName("SparkSQLJDBC2MySQL");
        SparkContext sc = new SparkContext(config);
        sc.addJar("/home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/mysql-connector-java-5.1.39-bin.jar");
        SQLContext sqlContext = new SQLContext(sc);

        /*
         * 1、連線資料庫:通過format(“jdbc”)的方式說明SparkSQL操作的資料來源是通過JDBC獲得,JDBC後端
         * 一般都是資料庫,eg、MySQL; 傳遞相關DB連結資訊
         */
        DataFrameReader reader = sqlContext.read().format("jdbc");
        reader.option("url", "jdbc:mysql://112.74.21.122:3306/hive");
        reader.option("driver", "com.mysql.jdbc.Driver");
        reader.option("dbtable", "spark");
        reader.option("user", "hive");
        reader.option("password", "hive");

        /*
         * 2、載入相關資料
         */
        DataFrame sparkDF = reader.load();
        reader.option("dbtable", "hadoop");
        DataFrame hadoopDF = reader.load();

        /*
         * 3、用Spark core組織待處理的資料:這裡以進行join操作(DataFrame轉換成 RDD後進行)為例
         */
        JavaPairRDD<String, Tuple2<Integer, Integer>> resultRDD = sparkDF.javaRDD().mapToPair(new PairFunction<Row, String,Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String,Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(String.valueOf(row.getAs("name")), (Integer) row.getAs("age"));
            }
        }).join(hadoopDF.javaRDD().mapToPair(new PairFunction<Row,String,Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String,Integer> call(Row row) throws Exception {
                return new Tuple2<String, Integer>(String.valueOf(row.getAs("name")), (Integer)row.getAs("score"));
            }
        }));

        /*
         * 4、將組織好的資料交給 DataFrame 做業務處理 - 可以利用 Spark SQL 、Core、ML等進行復雜的操作!!!
         */
        // 獲取 JavaRDD<Row>
        JavaRDD<Row> resultRowRDD = resultRDD.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception {
                return RowFactory.create(tuple._1,tuple._2._1,tuple._2._2);
            }
        });
        //構建StructType,用於最後DataFrame元資料的描述
        List<StructField> fields = new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        fields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
        StructType  type = DataTypes.createStructType(fields);

        DataFrame personsDF = sqlContext.createDataFrame(resultRowRDD, type);

        // 具體業務處理 - 這裡只是簡單的 show 一下
        System.out.println("========================業務處理開始:ML,圖計算等工具處理=================");
        System.out.println("==== start showing ====");
        personsDF.show();
        System.out.println("========================業務處理結束:ML,圖計算等工具處理=================");

        /*
         * 5、儲存處理後的資料:可以存放到hive,db等資料倉庫中
         */
        personsDF.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Iterator<Row> iterator) throws SQLException 
            {
                Connection conn =  null;
                StringBuilder sql = new StringBuilder("INSERT INTO dfperson VALUES ( ");
                while(iterator.hasNext())
                {
                    Row row = iterator.next();
                    sql.append(String.valueOf(row.getAs("name"))).append(",").append(row.getInt(1)).append(",").append(row.getInt(2));
                }
                sql.append(")");
                try
                {
                    conn = DriverManager.getConnection("jdbc:mysql://112.74.21.122:3306/hive", "hive", "hive");
                     boolean flag = conn.createStatement().execute(sql.toString());
                } 
                catch (SQLException e) 
                {
                    e.printStackTrace();
                }
                finally
                {
                    if(null != conn) conn.close(); 
                }
            }
        });
    }
}
  • SparkSQL JDBC 2 ThriftServer
package cool.pengych.spark.sql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

/**
 * @author pengyucheng
 * 通過JDBC訪問Thrift Server,進而訪問Spark SQL,進而訪問Hive
 *
 */
public class SparkSQLJDBC2ThriftServer
{
    public static void main(String[] args) throws ClassNotFoundException, SQLException
    {
        String sql = "select name from people wher age = ? ";
        Class.forName("org.apache.hive.jdbc.HiveDriver");
        Connection conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice",
                "root","");
        conn.prepareStatement(sql);
        PreparedStatement preparedStatement = conn.prepareStatement(sql);
        preparedStatement.setInt(0, 27);
        ResultSet rs = preparedStatement.executeQuery();
        while(rs.next())
        {
            System.out.println(rs.getString(1));
        }

        conn.close();
    }

}
  • SparkSQL 2 Parquet
package cool.pengych.spark.sql;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

/**
 * Spark SQL操作 Parquet 格式的檔案內容
 * @author pengyucheng
 *
 */
public class SparkSQLParquet
{
    public static void main(String[] args) {
        /*
         * 建立SQLContext
         */
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLParquet");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(jsc);

        /*
         * 註冊成為臨時表以供後續SQL查詢操作
         */
        DataFrame df = sqlContext.read().parquet("file:///home/hadoop/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet");
        df.registerTempTable("users");

        /*
         * 進行資料的多維度分析
         */
        DataFrame result = sqlContext.sql("select name from users");
        JavaRDD<String> strs = result.javaRDD().map(new Function<Row, String>() {
            @Override
            public String call(Row row) throws Exception {
                return "The name is :"+row.getAs("name");
            }
        });

        /*
         * 對結果進行處理
         */
        List<String> listRow = strs.collect();
        for (String row : listRow) {
            System.out.println(row);
        }
    }

}

總結

用蓬勃的生命力戰勝人性的悲劇性!package cool.pengych.spark.sql;