1. 程式人生 > >spark學習記錄(十、SparkSQL)

spark學習記錄(十、SparkSQL)

一、介紹

  • SparkSQL支援查詢原生的RDD。 RDD是Spark平臺的核心概念,是Spark能夠高效的處理大資料的各種場景的基礎。
  • 能夠在Scala中寫SQL語句。支援簡單的SQL語法檢查,能夠在Scala中寫Hive語句訪問Hive資料,並將結果取回作為RDD使用。

DataFrame也是一個分散式資料容器。與RDD類似,然而DataFrame更像傳統資料庫的二維表格,除了資料以外,還掌握資料的結構資訊,即schema。同時,與Hive類似,DataFrame也支援巢狀資料型別(struct、array和map)。從API易用性的角度上 看, DataFrame API提供的是一套高層的關係操作,比函式式的RDD API要更加友好,門檻更低。

DataFrame的底層封裝的是RDD,只不過RDD的泛型是Row型別。

二、載入DataFrame方法 

新增依賴

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>2.4.0</version>
        </dependency>
public class JavaExample {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("JavaExample");
        JavaSparkContext sc = new JavaSparkContext (conf);

        SQLContext sqlContext = new SQLContext(sc);
//        載入json檔案
        Dataset<Row> json = sqlContext.read().format("json").load("C://json");
        Dataset<Row> json1 = sqlContext.read().json("C://json");
        /**
         * sqlContext讀取json檔案載入成DataFrame時,DataFrame的列會按照ASCII碼排序
         * 寫sql查詢出的DataFrame會按照指定欄位顯示列
         * show()預設顯示前20行資料,show(100)顯示100行
         */
        //查詢表內容
//        json.show();
        //查詢表結構
//        json.printSchema();
        //select name,age from xxx where age >18
//        json.select("name","age").where(json.col("age").gt(18)).show();

        /**
         * 將DataFrame註冊成臨時表
         * 注意:t1表這張表既不在記憶體中也不在磁碟中,相當於一個指標指向原始檔,底層操作解析Spark job讀取原始檔
         */
        json.registerTempTable("t1");
        sqlContext.sql("select name,age from t1 where age>18").show();

        //DataFrame轉換成RDD,並獲取第一列資料
        JavaRDD<Row> rdd = json.javaRDD();
        rdd.foreach(new VoidFunction<Row>() {
            public void call(Row row) throws Exception {
                System.out.println(row.get(0));
            }
        });

        sc.stop();
    }
}

讀取json格式的檔案建立DataFrame:

  • json檔案中的json資料不能巢狀json格式資料。
  • DataFrame是一個一個Row型別的RDD,df.rdd()/df.javaRdd()。
  • 可以兩種方式讀取json格式的檔案。
  • df.show()預設顯示前20行資料。
  • DataFrame原生API可以操作DataFrame(不方便)。
  • 註冊成臨時表時,表中的列預設按ascii順序顯示列。

普通RDD轉換為DataFrame

public class Person implements Serializable{

    private String id;
    private String name;
    private Integer age;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Person{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                ", age=" + age +
                '}';
    }
}
//通過反射的方式將非json格式的RDD轉換成DataFrame
public class JavaExample {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("JavaExample");
        JavaSparkContext sc = new JavaSparkContext (conf);

        SQLContext sqlContext = new SQLContext(sc);
        JavaRDD<String> lineRDD = sc.textFile("C:\\person.txt");
        /**
         * 注意:
         * 1.自定義類Person必須為public
         * 2.RDD轉化為DataFrame會把自定義類欄位名稱按ASCII排序
         * 3.自定義類要實現序列化介面
         */
        JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
            public Person call(String s) throws Exception {
                Person p = new Person();
                p.setId(s.split(",")[0]);
                p.setName(s.split(",")[1]);
                p.setAge(Integer.valueOf(s.split(",")[2]));
                return p;
            }
        });

        Dataset<Row> dataFrame = sqlContext.createDataFrame(personRDD, Person.class);
        dataFrame.show();

        //將DataFrame轉換為JavaRDD
        JavaRDD<Row> javaRDD = dataFrame.javaRDD();
        JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
            public Person call(Row row) throws Exception {
                Person p = new Person();
                p.setId((String) row.getAs("id"));
                p.setName((String) row.getAs("name"));
                p.setAge((Integer) row.getAs("age"));
                return p;
            }
        });
        map.foreach(new VoidFunction<Person>() {
            public void call(Person person) throws Exception {
                System.out.println(person);
            }
        });

        sc.stop();
    }
}
//動態建立Schema將非json格式的RDD轉換成DataFrame
public class JavaExample {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("JavaExample");
        JavaSparkContext sc = new JavaSparkContext (conf);

        SQLContext sqlContext = new SQLContext(sc);
        JavaRDD<String> lineRDD = sc.textFile("C:\\person.txt");

        JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
            public Row call(String s) throws Exception {
                return RowFactory.create(
                        s.split(",")[0],
                        s.split(",")[1],
                        Integer.valueOf(s.split(",")[2])
                );
            }
        });
        /**
         * 動態構建DataFrame中的元資料
         */
        List<StructField> asList = Arrays.asList(
                DataTypes.createStructField("id",DataTypes.StringType,true),
                DataTypes.createStructField("name",DataTypes.StringType,true),
                DataTypes.createStructField("age",DataTypes.IntegerType,true)
        );
        StructType schema = DataTypes.createStructType(asList);
        Dataset<Row> dataFrame = sqlContext.createDataFrame(rowRDD,schema);
        dataFrame.show();

        sc.stop();
    }
}

讀取parquet檔案建立DataFrame

public class JavaExample {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("JavaExample");
        JavaSparkContext sc = new JavaSparkContext (conf);

        SQLContext sqlContext = new SQLContext(sc);
        JavaRDD<String> jsonRDD = sc.textFile("C:\\json");
        Dataset<Row> df = sqlContext.read().json(jsonRDD);
        /**
         * 將DataFormat儲存成parquet檔案,
         * SaveMode指定儲存檔案時的儲存模式:
         * Overwrite:覆蓋
         * Append:追加
         * ErrorIfExists:如果存在就報錯
         * Ignore:如果存在就忽略
         */
        df.write().mode(SaveMode.Overwrite).parquet("C:\\parquet");

        /**
         * 載入parquet檔案成DataFrame檔案
         */
        Dataset<Row> parquet = sqlContext.read().parquet("C:\\parquet");
        parquet.show();

        sc.stop();
    }
}

讀取JDBC中的資料建立DataFrame(MySql為例)

public class JavaExample {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("JavaExample");
        //配置join或聚合操作shuffle資料時的分割槽數量
        conf.set("spark.sql.shuffle.partitions","1");
        JavaSparkContext sc = new JavaSparkContext (conf);
        SQLContext sqlContext = new SQLContext(sc);

        /**
         * 第一種方法
         */
        Map<String, String> options = new HashMap<String, String>();
        options.put("url","jdbc:mysql://192.168.2.125:3306/mysql");
        options.put("driver","com.mysql.jdbc.Driver");
        options.put("user","root");
        options.put("password","123456");
        options.put("dbtable","t_waybill");

        Dataset<Row> load = sqlContext.read().format("jdbc").options(options).load();
        load.show();

        /**
         * 第二種方法
         */
        DataFrameReader reader = sqlContext.read().format("jdbc");
        reader.option("url","jdbc:mysql://192.168.2.125:3306/mysql");
        reader.option("driver","com.mysql.jdbc.Driver");
        reader.option("user","root");
        reader.option("password","123456");
        reader.option("dbtable","t_waybill");

        Dataset<Row> load1 = reader.load();
        load1.show();

        /**
         * 將DataFrame結果儲存到mysql中
         */
        Properties properties = new Properties();
        properties.setProperty("user","root");
        properties.setProperty("password","123456");
        /**
         * SaveMode:
         * Overwrite:覆蓋
         * Append:追加
         * ErrorIfExists:如果存在就報錯
         * Ignore:如果存在就忽略
         */
        load.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.2.125:3306/mysql","t_waybill",properties);
        sc.stop();
    }
}