Spark-SQL程式設計總結
概覽
Spark SQL用於處理結構化資料,與Spark RDD API不同,它提供更多關於資料結構資訊和計算任務執行資訊的介面,Spark SQL內部使用這些額外的資訊完成特殊優化。可以通過SQL、DataFrames API、Datasets API與Spark SQL進行互動,無論使用何種方式,SparkSQL使用統一的執行引擎記性處理。使用者可以根據自己喜好,在不同API中選擇合適的進行處理。本章中所有用例均可以在spark-shell、pyspark shell、sparkR中執行。
SQL
執行SQL語句的方法有多種:
- 可以使用基礎SQL語法或HiveQL語法在Spark SQL上執行查詢,SparkSQL可以從已安裝的Hive中讀取資料。當使用其他程式語言時,結果集以DataFrame型別返回
- 通過SQL命令列進行互動(spark-sql)
- 可以通過JDBC/ODBC驅動進行互動
DataFrames
DataFrame是由分散式資料集合組成的一系列命名列,它與關係資料庫的表類似,但有很多優化的地方。DataFrame支援多種資料來源,包括結構化資料、Hive的表、外部資料庫、RDDs等。DataFrame API支援scala 、java、Python和R語言。
Datasets
資料集介面在Spark1.6才加入,它可以使用Spark SQL的優化器對RDD操作進行優化。Dataset有JVM物件構建,並可以進行map、flatMap、filter等操作。Dataset API統一介面支援java和scala語言。
開始
程式入口: SQLContext
SQLContext是Spark SQL所有功能的入口,通過SparkContext可以建立該物件的例項:
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
除了SQLContext,還可以建立HiveContext物件,它包含更多的功能,例如HiveQL解析器支援更完善的語法、使用Hive使用者自定義函式UDFs、從Hive表中讀取資料等。HiveContext不依賴Hive是否安裝,Spark預設支援HiveContext。從Spark1.3以後,推薦使用HiveContext,未來SQLContext會包含HiveContext中的功能。
可以通過spark.sql.dialect選項更改SQL解析器,這個引數可以再SQLContext的setConf方法設定,也可以通過SQL的ky=value語法設計。在SQLContext中dialect只支援一種簡單的SQL解析器“sql”。HiveContext預設解析器是“hiveql”,同時支援“sql”,但一般推薦hiveql,因為它語法更全。
建立DataFrames
DataFrames的資料來源多種多樣,例如RDD、Hive table或者其他資料來源。 下面程式碼從JSON檔案建立了一個DataFrame
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
DataFrame 操作
DataFrame支援結構化資料領域常用的資料操作,支援Scala、Java、Python和R語言,下面是一些基本操作示例:
JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");
// Show the content of the DataFrame
df.show();
// age name
// null Michael
// 30 Andy
// 19 Justin
// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin
// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name (age + 1)
// Michael null
// Andy 31
// Justin 20
// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30 Andy
// Count people by age
df.groupBy("age").count().show();
// age count
// null 1
// 19 1
// 30 1
對於DataFrame的所有操作型別可以參考API文件。除了簡單的列操作,DataFrame還支援字串操作、日期演算法、資料操作等等,可以參考DataFrame函式文件
編碼實現SQL查詢
SQLContext的sql方法支援執行sql語法的查詢,並返回DataFrame型別的結果集:
SQLContext sqlContext = ... // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")
建立Datasets
Dataset與RDD類似,但它不適用java序列化也不適用Kryo,而是使用特定的Encoder作為序列化工具。Encoder可以對Spark物件進行序列化和反序列化,同時不需要反序列化在位元組級別就能支援filtering、sorting和hashing等操作。
// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)
// Encoders are also created for case classes.
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()
// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]
RDD互動操作
在Spark SQL中有兩種方式可以在DataFrame和RDD進行轉換,第一種方法是利用反射機制,推導包含某種型別的RDD,通過反射將其轉換為指定型別的DataFrame,適用於提前知道RDD的schema。
第二種方法通過程式設計介面與RDD進行互動獲取schema,並動態建立DataFrame,在執行時決定列及其型別。
使用反射推斷Schema
Scala支援使用case class型別匯入RDD轉換為DataFrame,通過case class建立schema,case class的引數名稱會被利用反射機制作為列名。case class可以巢狀組合成Sequences或者Array。這種RDD可以高效的轉換為DataFrame並註冊為表。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)
程式設計指定schema
當case class不能提前定義好時,可以通過以下三步通過程式碼建立DataFrame
- 將RDD轉為包含row物件的RDD
- 基於structType型別建立schema,與第一步建立的RDD相匹配
- 通過SQLContext的createDataFrame方法對第一步的RDD應用schema
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)
資料來源
DataFrame介面支援一系列的資料來源,它可以按照普通RDD進行操作,也能被註冊為臨時表進行操作。註冊臨時表後可以使用SQL查詢操作資料集,本章節介紹了常用載入儲存資料的方法,同時給出了內部資料來源的特殊操作。
常規Load/Save函式
未配置spark.sql.sources.default情況下,預設使用parquet資料來源處理所有操作。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
手動指定選項
使用者可以手動指定資料來源載入的選項,對於資料來源型別需要使用完整名稱指定例如(org.apache.spark.sql.parquet),但對於內部型別可以使用簡稱,例如(json parquet jdbc等)。可以通過以上方法在不同DataFrame之間進行轉換。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
在檔案上直接執行SQL
除了需要將檔案載入到DataFrame再執行sql以外,還可以直接執行sql
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
儲存模式
Save通過SaveMode指定如何維護現有的資料。需要注意的是savemode未對資料加鎖,因而不是源自操作。若使用overwrite模式時,原有資料會先被清空。
Scala/Java | Any Language | 含義 |
---|---|---|
SaveMode.ErrorIfExists (default) | "error" (default) | 當資料輸出的位置已存在時,丟擲此異常 |
SaveMode.Append | "append" | 當資料輸出的位置已存在時,在檔案後面追加 |
SaveMode.Overwrite | "overwrite" | 當資料輸出的位置已存在時,重寫 |
SaveMode.Ignore | "ignore" | 當資料輸出的位置已存在時,不執行任何操作,與 CREATE IF NOT EXISTS類似 |
儲存到持久化表中
使用HiveContext時,DataFrame可以使用saveAsTable方法儲存到持久化表中。與registerTempTable不同,saveASTable會為其真正建立資料區並建立指向該區域的指標放入HiveMetaStore中。在持有同一個metastore的連線期間,持久化的資料會一直存在,即使spark程式重啟也不影響。可以通過SQLContext的table方法建立用於持久化表的DataFrame。
預設的saveASTable會建立“managed table”,其資料位置會被metastore維護,被管理的表資料會在表被刪除時清空。
Parquet檔案
parquet是一種流行的列式儲存格式。SparkSQL支援對parquet的讀寫以及schema和資料的維護。在寫parquet檔案時,為了相容,所有列都會轉換為nullable格式。
程式設計實現資料載入
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
分割槽推斷
表分割槽是Hive等系統的常用優化手段。在一個分割槽表中,資料經常分佈在不同目錄下,分割槽列的值相同的資料分佈在同一目錄中。目前支援對parquet檔案進行自動推斷分割槽。例如我們可以將之前的資料增加兩列gender和country,並將兩列作為分割槽列進行資料分割槽。
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
將資料路徑傳給SQLContext後,可以自動推斷DataFrame資料的分割槽資訊。注意,資料的分割槽列是自動推斷出來你的,目前分割槽列支援數值型別和string型別。若使用者不希望自動推斷分割槽列時,可以通過spark.sql.sources.partitionColumnTypeInference.enabled配置禁止自動推斷,此時會使用string型別列進行分割槽。 分割槽型別會根據傳入的路徑進行推斷,但使用者可以配置資料來源的basePath屬性設定分析的路徑。
Schema合併
parquet支援列增加等操作,當出現多個互相相容的schemas時,parquet可以自動檢測併合並這些檔案的schema。由於schema 合併會消耗大量的資源,預設關閉該操作,可以通過以下方法開啟:
- 設定資料來源mergeSchema屬性為true
- 設定SQL的選項spark.sql.parquet.mergeSchema為true
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
Hive metasotre Parquet錶轉化
SparkSQL使用內部庫而不是Hive SerDe,對Hive metasotre Parquet表進行讀寫,效能很好,可以通過spark.sql.hive.convertMetastoreParquet配置。
Hive/Parquet Schema Reconciliation
由於Hive和Parquet的元資料處理方式不同,如下所示
- Hive忽略大小寫,而Parquet沒有
- Hive所有欄位都是nullable,而parquet中null是有意義的值(避免理解錯誤,貼上原文:Hive considers all columns nullable, while nullability in Parquet is significant)
將Hive metastore Parquet table轉換為Spark SQL parquet表時,遵從以下規則:
-
相同名稱的欄位的資料型別必須相同,nullable型別被忽略。由於融合的資料型別需要在parquet中有對應的型別,所以nullability型別需要處理。
-
融合後schema中包含了Hive元資料中定義的值
- 任何只在Parquet schema中出現的欄位被拋棄
- 任何旨在Hive元資料中出現的欄位作為nullable增加到融合後元資料中
元資料重新整理
Spark SQL會快取parquet元資料以便提高效能。若Hive metastore Parquet table轉換被啟用,則轉換的表元資料也會被cache。若這些元資料被外部工具修改,則需要手動更新快取元資料保持一致性。
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
配置
與parquet相關的配置引數如下所示
引數 | 預設值 | 描述 |
---|---|---|
spark.sql.parquet.binaryAsString | false | 該選項讓SparkSQL將string安裝二進位制資料按照字串處理,以便相容老系統 |
spark.sql.parquet.int96AsTimestamp | true | Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. |
spark.sql.parquet.cacheMetadata | true | 快取Parquet的Schema元資料,提高查詢靜態資料效率 |
spark.sql.parquet.compression.codec | gzip | 設定Parquet檔案的壓縮編碼方式,支援 uncompressed, snappy, gzip, lzo. |
spark.sql.parquet.filterPushdown | true | 啟用過濾謂詞下推優化,將過濾下推到抽取資料時,取得效能的提升 |
spark.sql.hive.convertMetastoreParquet | true | 若設為false,Spark SQL使用Hive SerDe支援對Parquet tables的操作. |
spark.sql.parquet.output.committer.class | org.apache.parquet.hadoop.ParquetOutputCommitter | The output committer class used by Parquet. The specified class needs to be a subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass of org.apache.parquet.hadoop.ParquetOutputCommitter. |
spark.sql.parquet.mergeSchema | false | 是否開啟Schema合併 |
JSON資料集
SQLContext.read.josn()介面可以自動推斷JSON檔案的schema。SparkSQL支援的JSON檔案中每一行需要是一個完整的JSON物件,不支援跨行的json物件。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)
// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)
// Register this DataFrame as a table.
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Hive 表
Spark SQL支援從Hive中讀取資料,但由於Hive依賴過多,預設不支援Hive,需要在編譯時新增-Phive -Phive-thriftserver
選項。由於用到Hive的序列化和反序列化需要保證Hive包在各個worker中都存在。
將hive-site.xml、core-site.xml和hdfs-site.xml放入conf目錄下配置Hive環境。在Yarn叢集上面執行時,需要確定datanucleus jar包和hive-site.xml在driver和所有executor上面都存在。可以通過spark-submit的--jars和--file引數檢查是否存在。
若通過Spark SQL操作Hive需要建立HiveContext,增加元資料功能及HiveQL支援。若沒有部署Hive環境同樣可以建立HiveContext。若沒有在hive-site.xml中配置,會自動在當前目錄建立metastore_db並在/user/hive/warehouse建立倉儲目錄,需要給hive對/user/hive/warehouse的寫許可權。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
與不同版本Hive Metastore互動
由於Spark SQL可以與不同版本的Hive Metastor(而不是Hive的版本)進行互動,只需要修改部分的配置資訊,相關配置如下:
屬性 | 預設值 | 描述 |
---|---|---|
spark.sql.hive.metastore.version | 1.2.1 | Hive metastore的版本資訊,從0.12.0到1.2.1 |
spark.sql.hive.metastore.jars | builtin | 指定metastore的Jar包位置,builtin:該jar被打包到spark應用程式中;maven:使用maven遠端倉儲下載;類路徑:需要包含hive所有的依賴包 |
spark.sql.hive.metastore.sharedPrefixes | com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc | 一個逗號分隔的類名字首列表,這些類使用classloader載入,且可以在Spark SQL和特定版本的Hive間共享。例如,用來訪問hive metastore 的JDBC的driver就需要這種共享。其他需要共享的類,是與某些已經共享的類有互動的類。例如,自定義的log4j appender。 |
spark.sql.hive.metastore.barrierPrefixes | (empty) | 使用逗號分隔的類名字首列表,Spark SQL所訪問的每個Hive版本都會被顯式的reload這些類。 |
JDBC連線其他資料庫
SparkSQL通過JdbcRDD實現對支援jdbc的資料庫進行資料載入,將其作為DataFrame進行操作。JDBC載入的資料來源不需要提供classTag。使用前需要將JDBC Driver包含在spark的classpath中。例如連線postgres需要如下設定
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
資料庫中的表可以作為DataFrame或SparkSQL的臨時表載入,支援以下的選項:
屬性 | 描述 |
---|---|
url | JDBC連線URL |
dbtable | 需要讀取的JDBC表。任何在From子句中的元素都可以,例如表或者子查詢等。 |
partitionColumn, lowerBound, upperBound, numPartitions | 這些選項需要同時制定,他們制定瞭如何併發讀取資料的同時進行分割槽。lowerBound, upperBound僅用於確定分割槽邊界不用於過濾資料,所有資料都會被分割槽 |
fetchSize | 決定了每次資料取多少行 |
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:postgresql:dbserver",
"dbtable" -> "schema.tablename")).load()
疑難問題
- JDBC的driver類需要在所有executor可見,因為Java的DriverManager會進行安全檢查,忽略所有不可見的類。可以通過修改每個worker節點的compute_classpath.sh以便包含Jar包
- 有些資料庫例如H2的名稱是大寫,需要在SparkSQL中同樣使用大寫
效能調優
對於一些負載可以通過記憶體快取資料或者調整引數提高效能。
記憶體快取資料
Spark SQL可以通過sqlContext.cacheTable("tableName") 或 dataFrame.cache()介面將RDD資料快取到記憶體中。SparkSql可以近掃描需要的列並自動壓縮、進行垃圾回收等。可以通過sqlContext.uncacheTable("Tablename")從記憶體中移除表。
屬性 | 預設值 | 描述 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | 若設為true,Spark SQL會基於列的統計資料自動選擇壓縮器進行資料壓縮 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制列快取的每批次的資料大小,資料越大則記憶體利用率及壓縮比例越大,但OOM風險也越大 |
其他配置資訊
可以通過修改以下配置提高查詢執行的效能,以後可能會棄用以下設定,而變為自動進行最優化配置。
屬性 | 預設值 | 描述 |
---|---|---|
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | 配置做join操作時被廣播變數的表的大小。當設為-1時禁用廣播。目前只有Hive元資料支援統計資訊,可以通過ANALYZE TABLE <tablename>
COMPUTE STATISTICS 進行資訊統計 |
spark.sql.tungsten.enabled | true | 若為true,或使用tungsten物理優化執行,顯式地管理記憶體並動態生成表示式計算的位元組碼 |
spark.sql.shuffle.partitions | 200 | 配置shuffle操作時的分割槽數量 |
分散式SQL引擎
當使用JDBC/ODBC或者命令列進行互動時,SparkSQL可以作為分散式查詢引擎執行。在這種模式下,Spark SQL的應用能夠不寫程式碼便執行查詢。
執行Thrift JDBC/ODBC驅動
這裡的實現與HiveServer2類似,可以通過beeline測試Spakr或者Hive1.2.1的JDBC驅動。通過以下命令啟動jdbc驅動
./sbin/start-thriftserver.sh
相關推薦
Spark-SQL程式設計總結
概覽 Spark SQL用於處理結構化資料,與Spark RDD API不同,它提供更多關於資料結構資訊和計算任務執行資訊的介面,Spark SQL內部使用這些額外的資訊完成特殊優化。可以通過SQL、DataFrames API、Datasets API與Spark S
Spark2.x學習筆記:14、Spark SQL程式設計
Spark2.x學習筆記:14、 Spark SQL程式設計 14.1 RDD的侷限性 RDD僅表示資料集,RDD沒有元資料,也就是說沒有欄位語義定義。 RDD需要使用者自己優化程式,對程式設計師要求較高。 從不同資料來源讀取資料相對困難。 合併多個數
SparkSQL(5)——Spark SQL程式設計方式執行查詢
編寫Spark SQL程式實現RDD轉換成DataFrame Spark官網提供了兩種方法來實現從RDD轉換得到DataFrame,第一種方法是利用反射機制,推導包含某種型別的RDD,通過反射將其轉換為指定型別的DataFrame,適用於提前知道RDD的sche
Spark Sql 程式設計式結構DataType轉換 程式碼類小結
基本型別資料轉DataType public DataType attrToDataType(String attrstr){ DataType returnDataType =
Spark學習(柒)- Spark SQL擴充套件和總結
文章目錄 Spark SQL使用場景 Spark SQL載入資料 1) RDD DataFrame/Dataset 2) Local Cloud(HDFS/S3) DataFrame與SQL的對比
Spark-SQL學習筆記_總結和拓展
一、Spark-SQL應用場景 1.資料檔案即席查詢 Ad-hoc 普通查詢:定製化查詢 2.對流資料檔案採用SQL分析 Spark-Streaming+Spark-SQL 3.使用SQL完成ETL開發
Spark SQL簡介及以程式設計方式實現SQL查詢
1.什麼是SparkSQL? Spark SQL是Spark用來處理結構化資料的一個模組,它提供了一個程式設計抽象叫做DataFrame並且作為分散式SQL查詢引擎的作用。 2.SparkSQL的特點: 我們已經學習了Hive,它是將Hive SQL轉換成M
第10章 Spark SQL擴充套件和總結
10-1 -課程目錄 10-2 -Spark SQL使用場景 10-3 -Spark SQL載入資料 10-4 -DataFrame與SQL的對比 10
十四.Spark SQL總結之spark日誌檔案資料形式的轉換
第一步.資料來源 找到spark的日誌資料來源,在/root/spark/spark-2.0.2-bin-hadoop2.7/logs目錄下: 通過對檔案的讀取,統計其中資料的條數: val masterLog = sc.textFile("file:///r
Spark SQL 筆記(18)——spark SQL 總結(1)
1 Spark SQl 使用場景 Ad-hoc querying of data in files Live SQL analytics over streaming data ETL capabilities alongside familiar SQL I
Spark SQL 筆記(19)——spark SQL 總結(2) DataFrame VS SQL
1 DataFrame DataFrame = RDD + Schema DataFrame is just a type alias for Dataset of Row DataFrame ov
《Spark 官方文件》Spark SQL, DataFrames 以及 Datasets 程式設計指南
spark-1.6.0 [原文地址] Spark SQL, DataFrames 以及 Datasets 程式設計指南 概要 Spark SQL是Spark中處理結構化資料的模組。與基礎的Spark RDD API不同,Spark SQL的介面提供了更多關於資料的結構資訊和計算任務的執
程式設計的兩種方式執行Spark SQL查詢(方式一)
現在我們來實現在自定義程式中編寫Spark SQL查詢程式。 實現查詢的方式有兩種: 方式一:通過反射推斷schema。 方式二:通過structtype直接指定schema。 我們先用方式一來實現自定義查詢。 首先建立一個team.txt檔案,內容有5列,分別是id,球隊
程式設計的兩種方式執行Spark SQL查詢(方式二)
現在我們來實現在自定義程式中編寫Spark SQL查詢程式。 實現查詢的方式有兩種: 方式一:通過反射推斷schema。 方式二:通過structtype直接指定schema。 這次我們用方式二來實現自定義查詢。 具體程式如下: package cn.allengao.s
Spark 2.4.0程式設計指南--Spark SQL UDF和UDAF
Spark 2.4.0程式設計指南–Spark SQL UDF和UDAF 更多資源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 視訊 Spark 2.4.0程
Spark SQL將rdd轉換為資料集-以程式設計方式指定模式(Programmatically Specifying the Schema)
一:解釋 官網:https://spark.apache.org/docs/latest/sql-getting-started.html 這種場景是生活中的常態 When case classes cannot be defined ahead of time (for example
spark-sql的概述以及程式設計模型的介紹
1、spark sql的概述 (1)spark sql的介紹: Spark SQL 是 Spark 用來處理結構化資料(結構化資料可以來自外部結構化資料來源也可以通 過 RDD 獲取)的一個模組,它提供了一個程式設計抽象叫做 DataFrame 並且作為分散式 SQL 查 詢引擎的作用。 外部的結構
Spark-SQL的具體程式設計場景
入門案例: object SparkSqlTest { def main(args: Array[String]): Unit = { //遮蔽多餘的日誌 Logger.getLogger("org.apache.hadoop").setLevel(Level.WAR
基於Spark的Hive程式設計中,“Error:(8, 37) java: 程式包org.apache.spark.sql.api.java不存在”的解決辦法
依賴Spark 1.2.0中的jar包程式設計時會出現這個問題,雖然這個Spark版本已較舊,但一些在舊的平臺上開發的人,可能還會遇到這個問題,因此將問題的解決辦法寫在這裡。 報的錯誤如下圖所示:
Spark-Sql之DataFrame實戰詳解
集合 case 編程方式 優化 所表 register 操作數 print ava 1、DataFrame簡介: 在Spark中,DataFrame是一種以RDD為基礎的分布式數據據集,類似於傳統數據庫聽二維表格,DataFrame帶有Schema元信息,即DataFram