1. 程式人生 > >技本功丨用短平快的方式告訴你:Flink-SQL的擴展實現

技本功丨用短平快的方式告訴你:Flink-SQL的擴展實現

mps flink 加載 ast 正則表達 快的 網絡 叠代 參數

2019年1月28日,阿裏雲宣布開源“計算王牌”實時計算平臺Blink回饋給ApacheFlink社區。官方稱,計算延遲已經降到毫秒級,也就是你在瀏覽網頁的時候,眨了一下眼睛,淘寶、天貓處理的信息已經刷新了17億次。

技術分享圖片

作為一家對技術有追求、有渴望的公司,怎麽少得了為Flink社區做些貢獻呢?

技術分享圖片

夫子說

首先,本文所述均基於flink 1.5.4。

我們為什麽擴展Flink-SQL?

由於Flink 本身SQL語法並不提供在對接輸入源和輸出目的的SQL語法。數據開發在使用的過程中需要根據其提供的Api接口編寫Source和 Sink, 異常繁瑣,不僅需要了解FLink 各類Operator的API,還需要對各個組件的相關調用方式有了解(比如kafka,redis,mongo,hbase等),並且在需要關聯到外部數據源的時候沒有提供SQL相關的實現方式,因此數據開發直接使用Flink編寫SQL作為實時的數據分析時需要較大的額外工作量。

我們的目的是在使用Flink-SQL的時候只需要關心做什麽,而不需要關心怎麽做。不需要過多的關心程序的實現,專註於業務邏輯。

接下來,我們一起來看下Flink-SQL的擴展實現吧!

01擴展了哪些flink相關sql

(1)創建源表語句

技術分享圖片

(2)創建輸出表語句

技術分享圖片

(3)創建自定義函數

技術分享圖片

(4)維表關聯

技術分享圖片

02各個模塊是如何翻譯到flink的實現

( 1 ) 如何將創建源表的sql語句轉換為flink的operator;

Flink中表的都會映射到Table這個類。然後調用註冊方法將Table註冊到environment。

StreamTableEnvironment.registerTable(tableName, table);

當前我們只支持kafka數據源。Flink本身有讀取kafka 的實現類, FlinkKafkaConsumer09,所以只需要根據指定參數實例化出該對象。並調用註冊方法註冊即可。

另外需要註意在flink sql經常會需要用到rowtime, proctime, 所以我們在註冊表結構的時候額外添加rowtime,proctime。

當需要用到rowtime的使用需要額外指定DataStream.watermarks(assignTimestampsAndWatermarks),自定義watermark主要做兩個事情:1:如何從Row中獲取時間字段。 2:設定最大延遲時間。

( 2 ) 如何將創建的輸出表sql語句轉換為flink的operator;

Flink輸出Operator的基類是OutputFormat, 我們這裏繼承的是RichOutputFormat, 該抽象類繼承OutputFormat,額外實現了獲取運行環境的方法getRuntimeContext(), 方便於我們之後自定義metric等操作。

我們以輸出到mysql插件mysql-sink為例,分兩部分:

  • 將create table 解析出表名稱,字段信息,mysql連接信息。

該部分使用正則表達式的方式將create table 語句轉換為內部的一個實現類。該類存儲了表名稱,字段信息,插件類型,插件連接信息。

  • 繼承RichOutputFormat將數據寫到對應的外部數據源。

主要是實現writeRecord方法,在mysql插件中其實就是調用jdbc 實現插入或者更新方法。

( 3) 如何將自定義函數語句轉換為flink的operator;

Flink對udf提供兩種類型的實現方式:

(1)繼承ScalarFunction

(2)繼承TableFunction

需要做的將用戶提供的jar添加到URLClassLoader, 並加載指定的class (實現上述接口的類路徑),然後調用TableEnvironment.registerFunction(funcName, udfFunc);即完成了udf的註冊。之後即可使用改定義的udf;

( 4 ) 維表功能是如何實現的?

流計算中一個常見的需求就是為數據流補齊字段。因為數據采集端采集到的數據往往比較有限,在做數據分析之前,就要先將所需的維度信息補全,但是當前flink並未提供join外部數據源的SQL功能。

實現該功能需要註意的幾個問題:

(1)維表的數據是不斷變化的

在實現的時候需要支持定時更新內存中的緩存的外部數據源,比如使用LRU等策略。

(2)IO吞吐問題

如果每接收到一條數據就串行到外部數據源去獲取對應的關聯記錄的話,網絡延遲將會是系統最大的瓶頸。這裏我們選擇阿裏貢獻給flink社區的算子RichAsyncFunction。該算子使用異步的方式從外部數據源獲取數據,大大減少了花費在網絡請求上的時間。

(3)如何將sql 中包含的維表解析到flink operator

為了從sql中解析出指定的維表和過濾條件, 使用正則明顯不是一個合適的辦法。需要匹配各種可能性。將是一個無窮無盡的過程。查看flink本身對sql的解析。它使用了calcite做為sql解析的工作。將sql解析出一個語法樹,通過叠代的方式,搜索到對應的維表;然後將維表和非維表結構分開。

技術分享圖片

通過上述步驟可以通過SQL完成常用的從kafka源表,join外部數據源,寫入到指定的外部目的結構中。

技本功丨用短平快的方式告訴你:Flink-SQL的擴展實現