1. 程式人生 > >dataframe操作hive資料倉庫【Java純程式碼】

dataframe操作hive資料倉庫【Java純程式碼】

package com.bjsxt;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.hive.HiveContext;


public class CreateDFFromHiveLocalTest {
    public static void main(String[] args) {
        SparkConf conf =new SparkConf().setAppName("hive").setMaster("local");
        JavaSparkContext sc=new JavaSparkContext(conf);
        //SQLContext sqlContext=new SQLContext(sc);
        //HiveContext是SQLContext的子類
        HiveContext hiveContext=new HiveContext(sc);
        hiveContext.sql("USE spark");
        hiveContext.sql("DROP TABLE IF EXISTS student_infos");
        //在hive中建立表student_infos表
        hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by '\t'");
        hiveContext.sql("load data local inpath './student_infos' into table student_infos");
        
        hiveContext.sql("DROP TABLE IF EXISTS student_scores");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING,score INT) row format delimited fields terminated by '\t'");
        hiveContext.sql(
                "LOAD DATA "
                + "LOCAL INPATH './student_scores'"
                + "INTO TABLE student_scores"                                
                );
        /**
         * 生成查詢表
         */
        DataFrame df = hiveContext.table("student_infos");//第二種讀取hive表載入DF的方式
        DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name,si.age,ss.score "
                + "FROM student_infos si "
                + "JOIN student_scores ss "
                + "ON si.name=ss.name "
                + "WHERE ss.score>=80"                
                );
        hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
        
        
        goodStudentsDF.registerTempTable("goodstudent");
        DataFrame result = hiveContext.sql("select * from goodstudent");
        result.show();
        /**
         * 將結果儲存在hive表 good_studeng_infos
         */
        goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");
        Row[] goodStudentsRows = hiveContext.table("good_student_infos").collect();
        for(Row goodStudentsRow: goodStudentsRows ) {
            System.out.println(goodStudentsRow);
        }
        sc.stop();
    }
}