1. 程式人生 > >Spark SQL下的Parquet使用最佳實踐和程式碼實戰

Spark SQL下的Parquet使用最佳實踐和程式碼實戰

一、Spark SQL下的Parquet使用最佳實踐

1)過去整個業界對大資料的分析的技術棧的Pipeline一般分為以下兩種方式:

a)Data Source -> HDFS -> MR/Hive/Spark(相當於ETL)-> HDFS Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通過JDBC/ODBC來作為資料服務使用);

b)Data Source -> Real timeupdate data to HBase/DB -> Export to Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通過JDBC/ODBC來作為資料服務使用);

上述的第二種方式完全可以通過Kafka+Spark Streaming+Spark SQL(內部也強烈建議採用Parquet的方式來儲存資料)的方式取代

2)期待的方式:DataSource -> Kafka -> Spark Streaming -> Parquet -> Spark SQL(ML、GraphX等)-> Parquet -> 其它各種Data Mining等。


二、Parquet的精要介紹

1)Parquet是列式儲存格式的一種檔案型別,列式儲存有以下的核心優勢:

a)可以跳過不符合條件的資料,只讀取需要的資料,降低IO資料量。

b)壓縮編碼可以降低磁碟儲存空間。由於同一列的資料型別是一樣的,可以使用更高效的壓縮編碼(例如RunLength Encoding和Delta Encoding)進一步節約儲存空間。

c)只讀取需要的列,支援向量運算,能夠獲取更好的掃描效能。


三、程式碼實戰

Java版本:

package com.dt.spark.SparkApps.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
public class SparkSQLParquetOps {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLParquetOps");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        DataFrame usersDF = sqlContext.read().parquet("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\users.parquet");
        /**
         * 註冊成為臨時表以供後續的SQL查詢操作
         */
        usersDF.registerTempTable("users");
        /**
         * 進行資料的多維度分析
         */
        DataFrame result = sqlContext.sql("select * from users");
        JavaRDD<String> resultRDD = result.javaRDD().map(new Function<Row, String>() {
            @Override
            public String call(Row row) throws Exception {
                return "The name is : " + row.getAs("name");
            }
        });
        /**
         * 第六步:對結果進行處理,包括由DataFrame轉換成為RDD<Row>,以及結構持久化
         */
        List<String> listRow = resultRDD.collect();
        for(String row : listRow){
            System.out.println(row);
        }
    }
}


Schema Merging

Java版本:

package com.dt.spark.SparkApps.sql;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class SchemaOps {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByProgramatically");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        
        // Create a simple DataFrame, stored into a partition directory
        JavaRDD<Integer> lines = sc.parallelize(Arrays.asList(1,2,3,4,5));
        PairFunction<Integer,Integer,Integer> df2 = new PairFunction<Integer,Integer,Integer>() {
            @Override
            public Tuple2 call(Integer x) throws Exception {
                return new Tuple2(x,x * 2);
            }
        };
        JavaPairRDD<Integer,Integer> pairs = lines.mapToPair(df2);
        /**
         * 第一步:在RDD的基礎上建立型別為Row的RDD
         */
        JavaRDD<Row> personsRDD = pairs.map(new Function<Tuple2<Integer, Integer>, Row>() {
            @Override
            public Row call(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception {
                return RowFactory.create(integerIntegerTuple2._1,integerIntegerTuple2._2);
            }
        });
        /**
         * 第二步:動態構造DataFrame的元資料,一般而言,有多少列,以及每列的具體型別可能來自於JSON檔案
         * 也可能來自於資料庫。
         * 指定型別
         */
        List<StructField> structFields = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("single",DataTypes.IntegerType,true));
        structFields.add(DataTypes.createStructField("double",DataTypes.IntegerType,true));
        /**
         * 構建StructType用於最後DataFrame元資料的描述
         */
        StructType structType = DataTypes.createStructType(structFields);
        /**
         * 第三步:基於以後的MetaData以及RDD<Row>來構建DataFrame
         */
        DataFrame personsDF = sqlContext.createDataFrame(personsRDD,structType);
        personsDF.write().parquet("data/test_table/key=1");
        // Create a simple DataFrame, stored into a partition directory
        JavaRDD<Integer> lines1 = sc.parallelize(Arrays.asList(6,7,8,9,10));
        PairFunction<Integer,Integer,Integer> df3 = new PairFunction<Integer,Integer,Integer>() {
            @Override
            public Tuple2 call(Integer x) throws Exception {
                return new Tuple2(x,x * 2);
            }
        };
        JavaPairRDD<Integer,Integer> pairs1 = lines.mapToPair(df2);
        /**
         * 第一步:在RDD的基礎上建立型別為Row的RDD
         */
        JavaRDD<Row> personsRDD1 = pairs1.map(new Function<Tuple2<Integer, Integer>, Row>() {
            @Override
            public Row call(Tuple2<Integer, Integer> integerIntegerTuple2) throws Exception {
                return RowFactory.create(integerIntegerTuple2._1,integerIntegerTuple2._2);
            }
        });
        /**
         * 第二步:動態構造DataFrame的元資料,一般而言,有多少列,以及每列的具體型別可能來自於JSON檔案
         * 也可能來自於資料庫。
         * 指定型別
         */
        List<StructField> structFields1 = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("single",DataTypes.IntegerType,true));
        structFields.add(DataTypes.createStructField("triple",DataTypes.IntegerType,true));
        /**
         * 構建StructType用於最後DataFrame元資料的描述
         */
        StructType structType1 = DataTypes.createStructType(structFields);
        /**
         * 第三步:基於以後的MetaData以及RDD<Row>來構建DataFrame
         */
        DataFrame personsDF1 = sqlContext.createDataFrame(personsRDD1,structType1);
        personsDF1.write().parquet("data/test_table/key=2");
        DataFrame df4 = sqlContext.read().option("mergeSchema","true").parquet("data/test_table");
        df4.printSchema();
    }
}

輸出結果如下:
root
|--single: integer (nullable = true)
|--double: integer (nullable = true)
|--single2: integer (nullable = true)
|--triple: integer (nullable = true)
|--key: integer (nullable = true)
複製程式碼

Scala版本:

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)