1. 程式人生 > >Spark SQL訪問PostgreSQL

Spark SQL訪問PostgreSQL

post bsp taf parquet user con ons new bat

隨著Spark SQL的正式發布,以及它對DataFrame的支持,它可能會取代HIVE成為越來越重要的針對結構型數據進行分析的平臺。在博客文章What’s new for Spark SQL in Spark 1.3中,Databricks的工程師Michael Armbrust著重介紹了改進了的Data Source API。
技術分享圖片

我們在對結構型數據進行分析時,總不可避免會遭遇多種數據源的情況。這些數據源包括Json、CSV、Parquet、關系型數據庫以及NoSQL數據庫。我們自然希望能夠以統一的接口來訪問這些多姿多態的數據源。

在我們產品的應用場景中,需要訪問PostgreSQL的數據以進行數據分析。我們可以通過Spark SQL提供的JDBC來訪問,前提是需要PostgreSQL的driver。方法是在build.sbt中添加對應版本的driver依賴。例如:

libraryDependencies ++= {
val sparkVersion = "1.3.0"
Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.postgresql" % "postgresql" % "9.4-1201-jdbc41"
)
}

根據Spark SQL的官方文檔,在調用Data Sources API時,可以通過SQLContext加載遠程數據庫為Data Frame或Spark SQL臨時表。加載時,可以傳入的參數(屬性)包括:url、dbtable、driver、partitionColumn、lowerBound、upperBound與numPartitions。

PostgreSQL Driver的類名為org.postgresql.Driver。由於屬性沒有user和password,因此要將它們作為url的一部分。假設我們要連接的數據庫服務器IP為192.168.1.110,端口為5432,用戶名和密碼均為test,數據庫為demo,要查詢的數據表為tab_users,則訪問PostgreSQL的代碼如下所示:

object PostgreSqlApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("FromPostgreSql").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

val query = "(SELECT * FROM tab_users) as USERS"
val url = "jdbc:postgresql://192.168.1.110:5432/demo?user=test&password=test"
val users = sqlContext.load("jdbc", Map(
"url" -> url,
"driver" -> "org.postgresql.Driver",
"dbtable" -> query
))

users.foreach(println)
}
}

上面的代碼將查詢語句直接放在query變量中,並傳遞給SQLContext用以加載。另一種方式是直接傳遞表名,然後通過調用registerTempTable()方法來註冊臨時表,並調用sql()方法執行查詢:

object PostgreSqlApp {
def main(args: Array[String]): Unit = {
//val sparkConf = new SparkConf().setAppName("FromPostgreSql").setMaster("local[2]")

val sparkConf = new SparkConf().setAppName("SparkSQL_Select_Table")

.set("spark.driver.allowMultipleContexts", "true")

.set("spark.sql.shuffle.partitions","12")

//本地啟動

.setMaster("local[2]");


val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)

val url = "jdbc:postgresql://192.168.1.110:5432/demo?user=test&password=test"
val dataFrame = sqlContext.load("jdbc", Map(
"url" -> url,
"driver" -> "org.postgresql.Driver",
"dbtable" -> "tab_users"
))

dataFrame.registerTempTable("USERS")
val users = sqlContext.sql("select * from USERS")
users.foreach(println)
}
}

從性能角度考慮,還可以在創建SQLContext時,設置一些配置項,例如:

val sqlContext = new SQLContext(sc)
sqlContext.setConf("spark.sql.inMemoryColumnarStorage.batchSize", "10000")

Spark SQL訪問PostgreSQL