1. 程式人生 > >Spark-SQL程式設計總結

Spark-SQL程式設計總結

概覽

Spark SQL用於處理結構化資料,與Spark RDD API不同,它提供更多關於資料結構資訊和計算任務執行資訊的介面,Spark SQL內部使用這些額外的資訊完成特殊優化。可以通過SQL、DataFrames API、Datasets API與Spark SQL進行互動,無論使用何種方式,SparkSQL使用統一的執行引擎記性處理。使用者可以根據自己喜好,在不同API中選擇合適的進行處理。本章中所有用例均可以在spark-shell、pyspark shell、sparkR中執行。

SQL

執行SQL語句的方法有多種:

  • 可以使用基礎SQL語法或HiveQL語法在Spark SQL上執行查詢,SparkSQL可以從已安裝的Hive中讀取資料。當使用其他程式語言時,結果集以DataFrame型別返回
  • 通過SQL命令列進行互動(spark-sql)
  • 可以通過JDBC/ODBC驅動進行互動

DataFrames

DataFrame是由分散式資料集合組成的一系列命名列,它與關係資料庫的表類似,但有很多優化的地方。DataFrame支援多種資料來源,包括結構化資料、Hive的表、外部資料庫、RDDs等。DataFrame API支援scala 、java、Python和R語言。

Datasets

資料集介面在Spark1.6才加入,它可以使用Spark SQL的優化器對RDD操作進行優化。Dataset有JVM物件構建,並可以進行map、flatMap、filter等操作。Dataset API統一介面支援java和scala語言。

開始

程式入口: SQLContext

SQLContext是Spark SQL所有功能的入口,通過SparkContext可以建立該物件的例項:

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

除了SQLContext,還可以建立HiveContext物件,它包含更多的功能,例如HiveQL解析器支援更完善的語法、使用Hive使用者自定義函式UDFs、從Hive表中讀取資料等。HiveContext不依賴Hive是否安裝,Spark預設支援HiveContext。從Spark1.3以後,推薦使用HiveContext,未來SQLContext會包含HiveContext中的功能。
可以通過spark.sql.dialect選項更改SQL解析器,這個引數可以再SQLContext的setConf方法設定,也可以通過SQL的ky=value語法設計。在SQLContext中dialect只支援一種簡單的SQL解析器“sql”。HiveContext預設解析器是“hiveql”,同時支援“sql”,但一般推薦hiveql,因為它語法更全。

建立DataFrames

DataFrames的資料來源多種多樣,例如RDD、Hive table或者其他資料來源。 下面程式碼從JSON檔案建立了一個DataFrame

JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();

DataFrame 操作

DataFrame支援結構化資料領域常用的資料操作,支援Scala、Java、Python和R語言,下面是一些基本操作示例:

JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");

// Show the content of the DataFrame
df.show();
// age  name
// null Michael
// 30   Andy
// 19   Justin

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// name
// Michael
// Andy
// Justin

// Select everybody, but increment the age by 1
df.select(df.col("name"), df.col("age").plus(1)).show();
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// Select people older than 21
df.filter(df.col("age").gt(21)).show();
// age name
// 30  Andy

// Count people by age
df.groupBy("age").count().show();
// age  count
// null 1
// 19   1
// 30   1

對於DataFrame的所有操作型別可以參考API文件。除了簡單的列操作,DataFrame還支援字串操作、日期演算法、資料操作等等,可以參考DataFrame函式文件

編碼實現SQL查詢

SQLContext的sql方法支援執行sql語法的查詢,並返回DataFrame型別的結果集:

SQLContext sqlContext = ... // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")

建立Datasets

Dataset與RDD類似,但它不適用java序列化也不適用Kryo,而是使用特定的Encoder作為序列化工具。Encoder可以對Spark物件進行序列化和反序列化,同時不需要反序列化在位元組級別就能支援filtering、sorting和hashing等操作。

// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// Encoders are also created for case classes.
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]

RDD互動操作

在Spark SQL中有兩種方式可以在DataFrame和RDD進行轉換,第一種方法是利用反射機制,推導包含某種型別的RDD,通過反射將其轉換為指定型別的DataFrame,適用於提前知道RDD的schema。
第二種方法通過程式設計介面與RDD進行互動獲取schema,並動態建立DataFrame,在執行時決定列及其型別。

使用反射推斷Schema

Scala支援使用case class型別匯入RDD轉換為DataFrame,通過case class建立schema,case class的引數名稱會被利用反射機制作為列名。case class可以巢狀組合成Sequences或者Array。這種RDD可以高效的轉換為DataFrame並註冊為表。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)

程式設計指定schema

當case class不能提前定義好時,可以通過以下三步通過程式碼建立DataFrame

  • 將RDD轉為包含row物件的RDD
  • 基於structType型別建立schema,與第一步建立的RDD相匹配
  • 通過SQLContext的createDataFrame方法對第一步的RDD應用schema
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)

資料來源

DataFrame介面支援一系列的資料來源,它可以按照普通RDD進行操作,也能被註冊為臨時表進行操作。註冊臨時表後可以使用SQL查詢操作資料集,本章節介紹了常用載入儲存資料的方法,同時給出了內部資料來源的特殊操作。

常規Load/Save函式

未配置spark.sql.sources.default情況下,預設使用parquet資料來源處理所有操作。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

手動指定選項

使用者可以手動指定資料來源載入的選項,對於資料來源型別需要使用完整名稱指定例如(org.apache.spark.sql.parquet),但對於內部型別可以使用簡稱,例如(json parquet jdbc等)。可以通過以上方法在不同DataFrame之間進行轉換。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

在檔案上直接執行SQL

除了需要將檔案載入到DataFrame再執行sql以外,還可以直接執行sql

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

儲存模式

Save通過SaveMode指定如何維護現有的資料。需要注意的是savemode未對資料加鎖,因而不是源自操作。若使用overwrite模式時,原有資料會先被清空。

Scala/Java Any Language 含義
SaveMode.ErrorIfExists (default) "error" (default) 當資料輸出的位置已存在時,丟擲此異常
SaveMode.Append "append" 當資料輸出的位置已存在時,在檔案後面追加
SaveMode.Overwrite "overwrite" 當資料輸出的位置已存在時,重寫
SaveMode.Ignore "ignore" 當資料輸出的位置已存在時,不執行任何操作,與 CREATE IF NOT EXISTS類似

儲存到持久化表中

使用HiveContext時,DataFrame可以使用saveAsTable方法儲存到持久化表中。與registerTempTable不同,saveASTable會為其真正建立資料區並建立指向該區域的指標放入HiveMetaStore中。在持有同一個metastore的連線期間,持久化的資料會一直存在,即使spark程式重啟也不影響。可以通過SQLContext的table方法建立用於持久化表的DataFrame。
預設的saveASTable會建立“managed table”,其資料位置會被metastore維護,被管理的表資料會在表被刪除時清空。

Parquet檔案

parquet是一種流行的列式儲存格式。SparkSQL支援對parquet的讀寫以及schema和資料的維護。在寫parquet檔案時,為了相容,所有列都會轉換為nullable格式。

程式設計實現資料載入

// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")

// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

分割槽推斷

表分割槽是Hive等系統的常用優化手段。在一個分割槽表中,資料經常分佈在不同目錄下,分割槽列的值相同的資料分佈在同一目錄中。目前支援對parquet檔案進行自動推斷分割槽。例如我們可以將之前的資料增加兩列gender和country,並將兩列作為分割槽列進行資料分割槽。

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

將資料路徑傳給SQLContext後,可以自動推斷DataFrame資料的分割槽資訊。注意,資料的分割槽列是自動推斷出來你的,目前分割槽列支援數值型別和string型別。若使用者不希望自動推斷分割槽列時,可以通過spark.sql.sources.partitionColumnTypeInference.enabled配置禁止自動推斷,此時會使用string型別列進行分割槽。 分割槽型別會根據傳入的路徑進行推斷,但使用者可以配置資料來源的basePath屬性設定分析的路徑。

Schema合併

parquet支援列增加等操作,當出現多個互相相容的schemas時,parquet可以自動檢測併合並這些檔案的schema。由於schema 合併會消耗大量的資源,預設關閉該操作,可以通過以下方法開啟:

  • 設定資料來源mergeSchema屬性為true
  • 設定SQL的選項spark.sql.parquet.mergeSchema為true
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

Hive metasotre Parquet錶轉化

SparkSQL使用內部庫而不是Hive SerDe,對Hive metasotre Parquet表進行讀寫,效能很好,可以通過spark.sql.hive.convertMetastoreParquet配置。

Hive/Parquet Schema Reconciliation

由於Hive和Parquet的元資料處理方式不同,如下所示

  • Hive忽略大小寫,而Parquet沒有
  • Hive所有欄位都是nullable,而parquet中null是有意義的值(避免理解錯誤,貼上原文:Hive considers all columns nullable, while nullability in Parquet is significant)

將Hive metastore Parquet table轉換為Spark SQL parquet表時,遵從以下規則:

  • 相同名稱的欄位的資料型別必須相同,nullable型別被忽略。由於融合的資料型別需要在parquet中有對應的型別,所以nullability型別需要處理。

  • 融合後schema中包含了Hive元資料中定義的值

    • 任何只在Parquet schema中出現的欄位被拋棄
    • 任何旨在Hive元資料中出現的欄位作為nullable增加到融合後元資料中

元資料重新整理

Spark SQL會快取parquet元資料以便提高效能。若Hive metastore Parquet table轉換被啟用,則轉換的表元資料也會被cache。若這些元資料被外部工具修改,則需要手動更新快取元資料保持一致性。

// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")

配置

與parquet相關的配置引數如下所示

引數 預設值 描述
spark.sql.parquet.binaryAsString false 該選項讓SparkSQL將string安裝二進位制資料按照字串處理,以便相容老系統
spark.sql.parquet.int96AsTimestamp true Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
spark.sql.parquet.cacheMetadata true 快取Parquet的Schema元資料,提高查詢靜態資料效率
spark.sql.parquet.compression.codec gzip 設定Parquet檔案的壓縮編碼方式,支援 uncompressed, snappy, gzip, lzo.
spark.sql.parquet.filterPushdown true 啟用過濾謂詞下推優化,將過濾下推到抽取資料時,取得效能的提升
spark.sql.hive.convertMetastoreParquet true 若設為false,Spark SQL使用Hive SerDe支援對Parquet tables的操作.
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.ParquetOutputCommitter The output committer class used by Parquet. The specified class needs to be a subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass of org.apache.parquet.hadoop.ParquetOutputCommitter.
spark.sql.parquet.mergeSchema false 是否開啟Schema合併

JSON資料集

SQLContext.read.josn()介面可以自動推斷JSON檔案的schema。SparkSQL支援的JSON檔案中每一行需要是一個完整的JSON物件,不支援跨行的json物件。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)

// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
//  |-- age: integer (nullable = true)
//  |-- name: string (nullable = true)

// Register this DataFrame as a table.
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

Hive 表

Spark SQL支援從Hive中讀取資料,但由於Hive依賴過多,預設不支援Hive,需要在編譯時新增-Phive -Phive-thriftserver選項。由於用到Hive的序列化和反序列化需要保證Hive包在各個worker中都存在。

將hive-site.xml、core-site.xml和hdfs-site.xml放入conf目錄下配置Hive環境。在Yarn叢集上面執行時,需要確定datanucleus jar包和hive-site.xml在driver和所有executor上面都存在。可以通過spark-submit的--jars和--file引數檢查是否存在。
若通過Spark SQL操作Hive需要建立HiveContext,增加元資料功能及HiveQL支援。若沒有部署Hive環境同樣可以建立HiveContext。若沒有在hive-site.xml中配置,會自動在當前目錄建立metastore_db並在/user/hive/warehouse建立倉儲目錄,需要給hive對/user/hive/warehouse的寫許可權。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

與不同版本Hive Metastore互動

由於Spark SQL可以與不同版本的Hive Metastor(而不是Hive的版本)進行互動,只需要修改部分的配置資訊,相關配置如下:

屬性 預設值 描述
spark.sql.hive.metastore.version 1.2.1 Hive metastore的版本資訊,從0.12.0到1.2.1
spark.sql.hive.metastore.jars builtin 指定metastore的Jar包位置,builtin:該jar被打包到spark應用程式中;maven:使用maven遠端倉儲下載;類路徑:需要包含hive所有的依賴包
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc 一個逗號分隔的類名字首列表,這些類使用classloader載入,且可以在Spark SQL和特定版本的Hive間共享。例如,用來訪問hive metastore 的JDBC的driver就需要這種共享。其他需要共享的類,是與某些已經共享的類有互動的類。例如,自定義的log4j appender。
spark.sql.hive.metastore.barrierPrefixes (empty) 使用逗號分隔的類名字首列表,Spark SQL所訪問的每個Hive版本都會被顯式的reload這些類。

JDBC連線其他資料庫

SparkSQL通過JdbcRDD實現對支援jdbc的資料庫進行資料載入,將其作為DataFrame進行操作。JDBC載入的資料來源不需要提供classTag。使用前需要將JDBC Driver包含在spark的classpath中。例如連線postgres需要如下設定

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

資料庫中的表可以作為DataFrame或SparkSQL的臨時表載入,支援以下的選項:

屬性 描述
url JDBC連線URL
dbtable 需要讀取的JDBC表。任何在From子句中的元素都可以,例如表或者子查詢等。
partitionColumn, lowerBound, upperBound, numPartitions 這些選項需要同時制定,他們制定瞭如何併發讀取資料的同時進行分割槽。lowerBound, upperBound僅用於確定分割槽邊界不用於過濾資料,所有資料都會被分割槽
fetchSize 決定了每次資料取多少行
val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()

疑難問題

  • JDBC的driver類需要在所有executor可見,因為Java的DriverManager會進行安全檢查,忽略所有不可見的類。可以通過修改每個worker節點的compute_classpath.sh以便包含Jar包
  • 有些資料庫例如H2的名稱是大寫,需要在SparkSQL中同樣使用大寫

效能調優

對於一些負載可以通過記憶體快取資料或者調整引數提高效能。

記憶體快取資料

Spark SQL可以通過sqlContext.cacheTable("tableName") 或 dataFrame.cache()介面將RDD資料快取到記憶體中。SparkSql可以近掃描需要的列並自動壓縮、進行垃圾回收等。可以通過sqlContext.uncacheTable("Tablename")從記憶體中移除表。

屬性 預設值 描述
spark.sql.inMemoryColumnarStorage.compressed true 若設為true,Spark SQL會基於列的統計資料自動選擇壓縮器進行資料壓縮
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列快取的每批次的資料大小,資料越大則記憶體利用率及壓縮比例越大,但OOM風險也越大

其他配置資訊

可以通過修改以下配置提高查詢執行的效能,以後可能會棄用以下設定,而變為自動進行最優化配置。

屬性 預設值 描述
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置做join操作時被廣播變數的表的大小。當設為-1時禁用廣播。目前只有Hive元資料支援統計資訊,可以通過ANALYZE TABLE <tablename> COMPUTE STATISTICS進行資訊統計
spark.sql.tungsten.enabled true 若為true,或使用tungsten物理優化執行,顯式地管理記憶體並動態生成表示式計算的位元組碼
spark.sql.shuffle.partitions 200 配置shuffle操作時的分割槽數量

分散式SQL引擎

當使用JDBC/ODBC或者命令列進行互動時,SparkSQL可以作為分散式查詢引擎執行。在這種模式下,Spark SQL的應用能夠不寫程式碼便執行查詢。

執行Thrift JDBC/ODBC驅動

這裡的實現與HiveServer2類似,可以通過beeline測試Spakr或者Hive1.2.1的JDBC驅動。通過以下命令啟動jdbc驅動

./sbin/start-thriftserver.sh

相關推薦

Spark-SQL程式設計總結

概覽 Spark SQL用於處理結構化資料,與Spark RDD API不同,它提供更多關於資料結構資訊和計算任務執行資訊的介面,Spark SQL內部使用這些額外的資訊完成特殊優化。可以通過SQL、DataFrames API、Datasets API與Spark S

Spark2.x學習筆記:14、Spark SQL程式設計

Spark2.x學習筆記:14、 Spark SQL程式設計 14.1 RDD的侷限性 RDD僅表示資料集,RDD沒有元資料,也就是說沒有欄位語義定義。 RDD需要使用者自己優化程式,對程式設計師要求較高。 從不同資料來源讀取資料相對困難。 合併多個數

SparkSQL(5)——Spark SQL程式設計方式執行查詢

編寫Spark SQL程式實現RDD轉換成DataFrame Spark官網提供了兩種方法來實現從RDD轉換得到DataFrame,第一種方法是利用反射機制,推導包含某種型別的RDD,通過反射將其轉換為指定型別的DataFrame,適用於提前知道RDD的sche

Spark Sql 程式設計式結構DataType轉換 程式碼類小結

基本型別資料轉DataType public DataType attrToDataType(String attrstr){ DataType returnDataType =

Spark學習(柒)- Spark SQL擴充套件和總結

文章目錄 Spark SQL使用場景 Spark SQL載入資料 1) RDD DataFrame/Dataset 2) Local Cloud(HDFS/S3) DataFrame與SQL的對比

Spark-SQL學習筆記_總結和拓展

一、Spark-SQL應用場景 1.資料檔案即席查詢 Ad-hoc     普通查詢:定製化查詢 2.對流資料檔案採用SQL分析   Spark-Streaming+Spark-SQL 3.使用SQL完成ETL開發  

Spark SQL簡介及以程式設計方式實現SQL查詢

1.什麼是SparkSQL? Spark SQL是Spark用來處理結構化資料的一個模組,它提供了一個程式設計抽象叫做DataFrame並且作為分散式SQL查詢引擎的作用。 2.SparkSQL的特點: 我們已經學習了Hive,它是將Hive SQL轉換成M

第10章 Spark SQL擴充套件和總結

10-1 -課程目錄   10-2 -Spark SQL使用場景   10-3 -Spark SQL載入資料   10-4 -DataFrame與SQL的對比 10

十四.Spark SQL總結spark日誌檔案資料形式的轉換

第一步.資料來源 找到spark的日誌資料來源,在/root/spark/spark-2.0.2-bin-hadoop2.7/logs目錄下: 通過對檔案的讀取,統計其中資料的條數: val masterLog = sc.textFile("file:///r

Spark SQL 筆記(18)——spark SQL 總結(1)

1 Spark SQl 使用場景 Ad-hoc querying of data in files Live SQL analytics over streaming data ETL capabilities alongside familiar SQL I

Spark SQL 筆記(19)——spark SQL 總結(2) DataFrame VS SQL

1 DataFrame DataFrame = RDD + Schema DataFrame is just a type alias for Dataset of Row DataFrame ov

Spark 官方文件》Spark SQL, DataFrames 以及 Datasets 程式設計指南

spark-1.6.0 [原文地址] Spark SQL, DataFrames 以及 Datasets 程式設計指南 概要 Spark SQL是Spark中處理結構化資料的模組。與基礎的Spark RDD API不同,Spark SQL的介面提供了更多關於資料的結構資訊和計算任務的執

程式設計的兩種方式執行Spark SQL查詢(方式一)

現在我們來實現在自定義程式中編寫Spark SQL查詢程式。 實現查詢的方式有兩種: 方式一:通過反射推斷schema。 方式二:通過structtype直接指定schema。 我們先用方式一來實現自定義查詢。 首先建立一個team.txt檔案,內容有5列,分別是id,球隊

程式設計的兩種方式執行Spark SQL查詢(方式二)

現在我們來實現在自定義程式中編寫Spark SQL查詢程式。 實現查詢的方式有兩種: 方式一:通過反射推斷schema。 方式二:通過structtype直接指定schema。 這次我們用方式二來實現自定義查詢。 具體程式如下: package cn.allengao.s

Spark 2.4.0程式設計指南--Spark SQL UDF和UDAF

Spark 2.4.0程式設計指南–Spark SQL UDF和UDAF 更多資源 github: https://github.com/opensourceteams/spark-scala-maven-2.4.0 視訊 Spark 2.4.0程

Spark SQL將rdd轉換為資料集-以程式設計方式指定模式(Programmatically Specifying the Schema)

一:解釋 官網:https://spark.apache.org/docs/latest/sql-getting-started.html 這種場景是生活中的常態 When case classes cannot be defined ahead of time (for example

spark-sql的概述以及程式設計模型的介紹

1、spark sql的概述 (1)spark sql的介紹:   Spark SQL 是 Spark 用來處理結構化資料(結構化資料可以來自外部結構化資料來源也可以通 過 RDD 獲取)的一個模組,它提供了一個程式設計抽象叫做 DataFrame 並且作為分散式 SQL 查 詢引擎的作用。  外部的結構

Spark-SQL的具體程式設計場景

入門案例: object SparkSqlTest { def main(args: Array[String]): Unit = { //遮蔽多餘的日誌 Logger.getLogger("org.apache.hadoop").setLevel(Level.WAR

基於Spark的Hive程式設計中,“Error:(8, 37) java: 程式包org.apache.spark.sql.api.java不存在”的解決辦法

依賴Spark 1.2.0中的jar包程式設計時會出現這個問題,雖然這個Spark版本已較舊,但一些在舊的平臺上開發的人,可能還會遇到這個問題,因此將問題的解決辦法寫在這裡。 報的錯誤如下圖所示:

Spark-Sql之DataFrame實戰詳解

集合 case 編程方式 優化 所表 register 操作數 print ava 1、DataFrame簡介: 在Spark中,DataFrame是一種以RDD為基礎的分布式數據據集,類似於傳統數據庫聽二維表格,DataFrame帶有Schema元信息,即DataFram