1. 程式人生 > >第67課:Spark SQL下采用Java和Scala實現Join的案例綜合實戰(鞏固前面學習的Spark SQL知識)

第67課:Spark SQL下采用Java和Scala實現Join的案例綜合實戰(鞏固前面學習的Spark SQL知識)

內容:

    1.SparkSQL案例分析     2.SparkSQL下采用Java和Scala實現案例

一、SparkSQL下采用Java和Scala實現案例

學生成績:

{"name":"Michael","score":98}
{"name":"Andy", "score":95}
{"name":"Justin", "score":91}

1.Spark SQL下采用Java實現Join的案例


package SparkSQL;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;


/**
 * FileName: SparkSQLWithJoin
 * Author:   hadoop
 * Email:    
[email protected]
* Date: 18-11-8 上午10:48 * Description: */ public class SparkSQLWithJoin { public static void main(String[] args){ //建立SparkConf用於讀取系統資訊並設定運用程式的名稱 SparkConf conf = new SparkConf().setAppName("SparkSQLWithJoin").setMaster("local"); //建立JavaSparkContext物件例項作為整個Driver的核心基石 JavaSparkContext sc = new JavaSparkContext(conf); //設定輸出log的等級 sc.setLogLevel("ERROR"); //建立SQLContext上下文物件,用於SqL的分析 SQLContext sqlContext = new SQLContext(sc); //建立dataFrame,可以簡單的認為DataFrame是一張表 Dataset peopleDS = sqlContext.read().json("file:///home/hadoop/people.json"); //基於JSON構建的DataFrame來註冊的臨時表 peopleDS.registerTempTable("peopleScore"); //查詢出分數大於90的人 Dataset excellentScore = sqlContext.sql("select name ,score from peopleScore where score >= 90"); /** * 在Dataframe的基礎上轉化為RDD,通過map操作計算出分數大於90的所有姓名 */ List<String> excellentScoreNameList = excellentScore.javaRDD().map(new Function<Row ,String>() { @Override public String call(Row row) throws Exception { return row.getAs("name"); } }).collect(); //動態組拼出JSON List<String> peopleInformations = new ArrayList<String>(); peopleInformations.add("{\"name\":\"Michael\",\"age\":20}"); peopleInformations.add("{\"name\":\"Andy\", \"age\":30}"); peopleInformations.add("{\"name\":\"Justin\", \"age\":19}"); //通過內容為JSON的rdd來構造dataframe JavaRDD<String> peopleInformationRDD = sc.parallelize(peopleInformations); Dataset peopleInformationDS = sqlContext.read().json(peopleInformationRDD); //註冊為臨時表 peopleInformationDS.registerTempTable("peopleInformations"); //查詢成績優秀的人的姓名和年齡的sql語句 String sqlText = "select name,age from peopleInformations where name in ("; for(int i = 0;i < excellentScoreNameList.size();i++){ sqlText+="'"+ excellentScoreNameList.get(i)+"'"; if (i < excellentScoreNameList.size()-1){ sqlText+=","; } } sqlText +=")"; //執行sql語句得到一個Dataset Dataset excellentNameAgeDS = sqlContext.sql(sqlText); //將成績優秀的人的成績和年齡進行jion操作 JavaPairRDD<String,Tuple2<Integer,Integer>> resultRDD = excellentScore.javaRDD().mapToPair(new PairFunction<Row,String,Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String,Integer>(row.getAs("name"),(int)row.getLong(1)); } }).join(excellentNameAgeDS.javaRDD().mapToPair(new PairFunction<Row,String,Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<String,Integer>(row.getAs("name"),(int)row.getLong(1)); } })); //呼叫RowFactory工廠方法生成記錄 JavaRDD<Row> reusltRowRDD = resultRDD.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() { @Override public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception { return RowFactory.create(tuple._1,tuple._2._2,tuple._2._1); } }); /** * 動態構造DataFrame的元資料,一般而言,有多少列以及每列的具體型別可能來自於json檔案,也可能來自於資料庫 */ List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("name", DataTypes.StringType,true)); structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType,true)); structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType,true)); //構建StructType,用於最後DataFrame元資料的描述 StructType structType = DataTypes.createStructType(structFields); //生成Dataset Dataset personDS = sqlContext.createDataFrame(reusltRowRDD,structType); personDS.show(); personDS.write().format("json").save("file:///home/hadoop/peopleResult.json"); sc.close(); } }

2.SparkSQL下采用Java和Scala實現案例


package SparkSQL

import org.apache.spark.sql.{RowFactory, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
;

/**
 * FileName: SparkSQLWithJionScala
 * Author:   hadoop
 * Email:    [email protected]
 * Date:     18-11-8 下午5:06
 * Description:
 */
object SparkSQLWithJionScala {
    def main(args: Array[String]): Unit = {
        //建立SparkConf用於讀取系統資訊並設定運用程式的名稱
        val conf = new SparkConf().setAppName("SparkSQLWithJionScala").setMaster("local")
        //建立JavaSparkContext物件例項作為整個Driver的核心基石
        val sc = new SparkContext(conf)
        //方便檢視輸出的日誌資訊,也可以設定為WARN、ERROR
        sc.setLogLevel("ERROR")
        //建立SQLContext上下文物件,用於SqL的分析
        val sqlContext = new SQLContext(sc)
        //建立dataFrame,可以簡單的認為DataFrame是一張表
        val personScoreDS = sqlContext.read.json("file:///home/hadoop/score.json")
       //將成績表註冊為臨時表
        personScoreDS.registerTempTable("personScores")
        //選擇出成績優秀的成績記錄
        val excellentStudentsDS = sqlContext.sql("select  name ,score from personScores where score >= 90")
       //取出成績優秀的學生姓名
        val  excellentStudentName = excellentStudentsDS.rdd.map(_(0))
       //學生的資訊姓名,年齡
        val peopleInformations = Array(
            "{\"name\":\"Michael\", \"age\":20}",
            "{\"name\":\"Andy\", \"age\":17}",
            "{\"name\":\"Justin\", \"age\":19}"
        )
      //將學生的資訊姓名,年齡註冊為RDD
       val peopleInformationRDD = sc.parallelize(peopleInformations)
      //將學生資訊註冊為JSON格式
        val  peopleInformationDS = sqlContext.read.json(peopleInformationRDD)
      //將學生的資訊註冊為臨時表
        peopleInformationDS.registerTempTable("peopleInformations")
      /**
        * 查詢成績為優秀的學生的資訊
        */
        val sqlText = new StringBuilder()
        sqlText.append("select name,age from peopleInformations where name in (")
        val students:Array[Any] = excellentStudentName.collect()
        for (i <- 0 until students.size){
            sqlText.append("'" + students(i).toString + "'")
            if (i != students.size-1)
                sqlText.append(",")
        }
        sqlText.append(")")
      val sqlString = sqlText.toString()
        val excellentStudentNameAgeDS = sqlContext.sql(sqlString)
      //將學生的成績表和資訊表進行jion操作
        val resultRDD = excellentStudentsDS.rdd.map(row=>(row.getAs("name").toString,row.getLong(1))).join(excellentStudentNameAgeDS.rdd.map(line=>(line.getAs("name").toString,line.getLong(1))))
      /**
        * 將jion後的資訊進行整理
        */
        val resultRowRDD = resultRDD.map(tuple=>{
            val name = tuple._1
            val age:java.lang.Integer=tuple._2._2.toInt
            val score:java.lang.Integer= tuple._2._1.toInt
          RowFactory.create(name, age, score)
        })
      //生成dataFrame
      val personDS = sqlContext.createDataFrame(resultRowRDD.map(row => PersonAgeScore(row.getString(0),row.getInt(1),row.getInt(2))))
      personDS.show()
      personDS.write.json("file:///home/hadoop/json")


    }
}
case class PersonAgeScore(name: String, age: Int, score: Int)