2019-03-09-Flink(6)——flink table & sql 介紹
本文轉自個人微信公眾號, 原文連結 。本部落格評論系統需要梯子,大家關注下公眾號方便交流。
本文基於 Flink 1.7。
隨著 Hadoop 的發展,有了Hive,使用HQL 即可完成原來繁瑣的Map Reduce 程式。
隨著 Spark的發展,引入了 Spark SQL。
隨著 Flink 版本的更迭,Flink 也提供了Flink SQL,以及 Table APIs。
注意:截止 Flink 1.7,Table API 和 SQL 還沒有完成,有些操作還不支援。
1. 基本概念
1.1 Why
那麼,為什麼要推出Table APIs和SQL?
首先,來看下Flink 的程式設計模型。如下圖所示(圖片來源於官網),DataStream API 和 DataSet API 是分開的,但是對於應用開發者來說,為什麼要關注這一點?對於相同的資料,批處理與流計算居然要寫兩套程式碼,簡直不可思議。Table APIs和SQL的推出,實現了流處理和批處理的統一。

flink-abstract.png
其次,降低了學習和使用門檻,基於 DataStream/DataSet APIs 的 Scala 或 Java 程式開發,對於BI/分析師來說,還是有一定門檻的,而SQL 則簡單太多了。
1.2 Dynamic Tables
Dynamic Tables 是 Flink Table API 和 SQL的核心概念,與大家熟知的Static Tables 相比,Dynamic Tables 隨著時間一直在變化。可以查詢Dynamic Table,查詢Dynamic Table 時會產生一個持續的查詢,持續的查詢不會終止,產生的結果也是Dynamic Table,根據輸入,輸出也會不斷變化。熟悉關係型資料庫的可以將Dynamic Tables的查詢跟關係型資料庫裡查詢物化檢視對比起來,需要注意的是,Dynamic Tables 是一個邏輯概念,不需要(全部)物化。
另外,需要注意,在Dynamic Table上的持續查詢的結果語義上是跟在Dynamic Table的快照上執行查詢相同。

flink_dynamic_table.png
如上圖所示:
- Stream 轉化為 Dynamic Table
- 在Dynamic Table 上執行查詢,得到的結果是一個新的Dynamic Table
- 最終的Dynamic Table 結果,被轉化為 Stream
1.3 Update Queries VS Append Queries
Append Queries:只會對查詢結果進行追加的查詢。
Update Queries:會更新查詢結果的查詢,一般需要維護更多的state。
1.4 查詢限制
有些 Stream 上的查詢需要花費巨大的代價:
- 需要維護的state 太大。持續查詢可能會執行非常長的時間,處理的資料量會非常大,對於一些需要更新原來結果的查詢,需要維護原來的結果,維護的state會很大。
- 更新計算代價高昂:輸入資料的一小點變化,可能有些查詢需要重新計算大量的資料,這種計算就不適合做持續查詢。
1.5 Table 到 Stream 的轉化
就像普通的資料庫Table 一樣,Dynamic Table也支援 insert
、 update
、 delete
等對它的更新。當需要將Dynamic Table 轉化為 Stream 或者輸出到外部系統時,需要對這些更新進行 encode
。
unique key
Append-only Stream 和 Retarct Stream 支援將Dynamic Table 轉化為DataStream。
2. 實戰
下面引入一個簡單的例子,從stream開始,轉化為 Table,然後查詢Table,最後將Table 轉化為Stream。
從例子可以很容易的看出,Stream 和 Table APIs / SQL 可以很容易的混用,這也給我們帶來了極大的便利性。
2.1 引入依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <!-- for batch query --> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- 上線時用provided,避免build的jar包太大,更避免衝突 --> <!--<scope>provided</scope>--> <!-- IDEA 裡用compile,否則in-ide execution會失敗 --> <scope>compile</scope> </dependency> <dependency> <!-- for streaming query --> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- 上線時用provided,避免build的jar包太大,更避免衝突 --> <!--<scope>provided</scope>--> <!-- IDEA 裡用compile,否則in-ide execution會失敗 --> <scope>compile</scope> </dependency>
2.2 隱式轉換
Flink 的 Scala Table APIs用了隱式轉換,所以,需要import 進來。
import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala._
2.3 建立 TableEnvironment
TableEnvironment 是 Table APIs 和 SQL 的核心,可以用於:
- 註冊Table
- 執行SQL 查詢
- 註冊UDF
- 將DataStream / DataSet 轉化為 Table
- 維護一個到ExecutionEnvironment或StreamExecutionEnvironment的引用。
Table 總是繫結到一個 TableEnvironment的,在使用時,在同一個SQL中不能聯合使用不同TableEnvironment的表,比如 join
或 union
。
下面建立一個用於Stream的StreamTableEnvironment。另外,建立一個簡單的stream。
// 建立StreamTableEnvironment val senv = StreamExecutionEnvironment.getExecutionEnvironment val stableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(senv) // 建立一個用於實驗的 Stream[ObjPrice] case class ObjPrice (name: String, price: Long) val stream: DataStream[ObjPrice] = senv.fromCollection(List(ObjPrice("car", 100000), ObjPrice("house", 2000000), ObjPrice("book", 100), ObjPrice("car", 900210)))
2.4 將Stream 轉化為 Table
val sTable1Rename: Table = stableEnv.fromDataStream(stream, 'myName, 'myPrice)
將上面的stream 轉化為 Table,同時對欄位進行重新命名。
2.5 查詢 Table
// 採用Table API 的方式進行查詢 val sTableResult: Table = sTable1Rename .filter('myPrice > 1000) .groupBy('myName) .select('myName, 'myPrice.sum as 'mySumPrice)
2.6 將 Table 轉化為 Stream
val sResultDataStream: DataStream[(Boolean, ObjPrice)] = stableEnv.toRetractStream[ObjPrice](sTableResult)
3. 總結
本文僅涉及一些基礎知識和最常見的使用,其他的比如註冊 Table
/ TableSink
/ TableSource
/ External Catalog
、資料型別與Table Schema的對映、查詢優化等並不涉及,可以參考 官網 進行查閱。
為了方便交流,請掃描下方二維碼關注我。

wxqr.jpg