1. 程式人生 > >Spark 2.0 -SQL 學習筆記

Spark 2.0 -SQL 學習筆記

概述

Spark SQL是Spark結構化資料處理模組,不同於基本的Spark RDD API,Spark SQL提供了更多資料結構和計算功能,並使用了這些豐富的資訊進行了額外的優化。
可以通過SQL和DataSet API和Spark SQL互動。與不同API/程式語言無關,使用了相同的執行引擎。
Spark SQL實際上是在執行SQL查詢,也可以從已有的Hive中讀取資料,在其它語言中返回Dataset/DataFrame結果。可以使用SQL命令列,JDBC/ODBC與SQL進行互動。
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API.

When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.

One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation.
When running SQL from within another programming language the results will be returned as a Dataset/DataFrame. You can also interact with the SQL interface using the command-line or over JDBC/ODBC.

Datasets and DataFrames

  2.0版本的Spark使用者可以在RDD,DataFrame和Dataset三種資料集之間無縫轉換,而是隻需使用超級簡單的API方法。
DataFrame是特殊的Dataset, 其每行是一個弱型別JVM object。 相對應地,Dataset是強型別JVM object的集合

  Dataset是一個分散式資料集,在1.6版本之後才有的。
Python中還不支援DatasetAPI,但是由於Python動態的特性,有替代的使用方法。R語言也是類似的情況。

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java.

Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

  DataFrame是一個由命名列(都是按指定列儲存)組成的分散式資料集。它從概念上講相當於關係資料庫裡的一張表,或R/Python裡的資料框架,但內部有很多優化。
DataFrame能通過廣泛的資料來源構建,比如:結構化資料檔案,Hive資料表,外部資料庫或已有的RDDs。
在Java API中,需要使用 Dataset 代表DataFrame。
While, in Java API, users need to use Dataset to represent a DataFrame.
A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row].

SparkSession-全新的切入點

  在Spark的早期版本,sparkContext是進入Spark的切入點。我們都知道RDD是Spark中重要的API,然而它的建立和操作得使用sparkContext提供的API;對於RDD之外的其他東西,我們需要使用其他的Context。比如對於流處理來說,我們得使用StreamingContext;對於SQL得使用sqlContext;而對於hive得使用HiveContext。然而DataSet和Dataframe提供的API逐漸稱為新的標準API,我們需要一個切入點來構建它們,所以在 Spark 2.0中我們引入了一個新的切入點:SparkSession
  SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的。

import org.apache.spark.sql.SparkSession;

        SparkSession sparkSession=SparkSession
                .builder()
                .appName("Java Spark SQL Example")
                //.config("spark.some.config.option", 
                //"some-value")
                .master("local")
                .enableHiveSupport()
                .getOrCreate();

定義完SparkSession後,後面就可以用spark進行操作了。
如果在Eclipse中執行,可能會提示JVM heap不夠的問題,設定如下:
選中被執行的類,點選選單‘run->run…’,選擇(x)=Argument標籤頁下的vm arguments框裡輸入 -Xmx512m, 儲存執行就ok了

Create DataFrame

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Untyped Dataset Operations

DataFrames provide a domain-specific language for structured data manipulation in Scala, Java, Python and R.

As mentioned above, in Spark 2.0, DataFrames are just Dataset of Rows in Scala and Java API. These operations are also referred as “untyped transformations” in contrast to “typed transformations” come with strongly typed Scala/Java Datasets.

Here we include some basic examples of structured data processing using Datasets:

如何合併表

如何合併不同DataFrame中的列到一個DataFrame
insert方法是不可用的

//用這種方式倒是可以合併不同表的兩列,將df的name和df2的age合併
        Dataset<Row> df3=df.select(df.col("name"),df2.col("age"));      

        //Inserting into an RDD-based table is not allowed.;
        //不允許向createOrReplaceTempView中插入資料
        //spark.sql("insert into table df2view select name from dfview");       

取樣

df.sample(false, 0.5)//重取樣  false=不放回,true=放回,0.5比例