原 薦 如何構建一個flink sql平臺
我們都知道,離線計算有Hive,使用過的知道,需要先定義一個schema,比如針對HDFS這種儲存對標mysql定義一個schema,schema的本質是什麼?主要描述下面這些資訊
1)當前儲存的物理位置的描述
2)資料格式的組成形式
然後Hive可以讓使用者定義一段sql,針對上面定義的schema進行,sql的本質是什麼,是業務邏輯的描述。然後Hive內部會將這段sql進行編譯轉化為原生的底層MapReduce操作,通過這種方式,遮蔽底層技術原理,讓業務開發人員集中精力在schema和sql業務邏輯上,flink sql平臺也正是做同樣的事情。
一開始經過跟上海同事的討論,選擇Uber的Athenax作為技術選型,通過翻閱原始碼,發現還是有很多不完善的地方,比如配置檔案採用yaml,如果做多叢集排程,平臺程式碼優化,多儲存擴充套件機制,都沒有考慮得很清楚,所以程式碼拿過來之後基本上可以說按照對yarn和flink的理解重新寫了一遍。
下面是平臺程式碼的思路
1)通過springboot提供HTTP API,提供多叢集定義,儲存在mysql裡
一個叢集需要定義的資訊點如下:
2)提供HTTP API讓業務進行Job定義
這裡的Job定義包含3個方面:job的輸出輸出的schema定義,job的業務邏輯定義(sql),job需要的yarn資源定義,具體來說如下所示:
Job定義
文中的sql定義
SELECT SUM(nested.number) as nestedNumber, hundredFunction(SUM(CAST(`value` AS DOUBLE))) as `sum`, COUNT(`value`) as `count`, AVG(CAST(`value` AS DOUBLE)) as `avg`, MAX(CAST(`value` AS DOUBLE)) as `max`, MIN(CAST(`value` AS DOUBLE)) as `min`, TUMBLE_END(`time`, INTERVAL '3' SECOND)as `time` FROM input.`ymm-appmetric-dev-self1` WHERE metric IS NOT NULL AND `value` IS NOT NULL and `time` IS NOT NULL GROUP BY metric,TUMBLE(`time`, INTERVAL '3' SECOND)
輸入/輸出schema定義,以kafka為例,輸入和輸出格式差不多
{ "brokerAddress":"略", "topic":"dev-metric", "schemas":[ {"key":"sum","type":"double"}, {"key":"count","type":"int"}, {"key":"avg","type":"double"}, {"key":"max","type":"double"}, {"key":"min","type":"double"}, {"key":"time","type":"timestamp"}, {"key":"nestedNumber","type":"int"} ] }
對於業務來說,“開啟IDE-> 瞭解flink語法 寫java程式碼->打包成jar->提交到yarn叢集”這一環節省去了,直接開啟介面,點選按鈕定義sql,寫一段業務邏輯sql,提交此業務到mysql,關閉瀏覽器即可.由平臺進行排程(秒級),永遠不用擔心這個任務某一天掛了怎麼辦,平臺會自動發現自動拉起.提交一次永遠不需要再人工干預,除非邏輯發生變化,在邏輯發生變化時也簡單,開啟任務修改再提交,關閉瀏覽器,結束,平臺會發現job變化殺死老任務拉起新任務.
下面講一下平臺內部是如何實現的
3)叢集自動發現
如果平臺維護方想增加一個叢集,通過介面直接定義一個存在mysql即可,後臺執行緒會自動發現,為每個叢集建立一個執行緒,多節點情況下,整個環境中某個特定叢集的多個執行緒通過ZK進行搶佔決定哪個執行緒當前為這個叢集服務.
增加JVM關閉鉤子,在JVM退出時,主動關閉ZK客戶端,釋放ZK上的臨時節點.
4)UDF的支援&自動發現
平臺支援平臺級UDF的定義,由平臺人員進行維護,平臺人員編寫指令碼,通過base64編碼存在mysql裡,歸屬到某個叢集,這個叢集的掃描執行緒發現有必要進行編譯時,實時編譯成class常駐記憶體,同時,打包成jar包上傳到遠端HDFS,後面會將此路徑放入到具體job的classpath路徑下. job就可以正確發現UDF.
當UDF沒有發生變化時,執行緒不會編譯,而是複用上一次的編譯結果.
5)程式可以任意部署,不依賴大資料環境
程式本身不依賴大資料環境的配置,具體是指不需要依賴當前宿主機.../etc/hadoop/*.xml檔案
通過讀取cluster的配置,動態生成XML配置,再生成HDFS/YARN的客戶端client,這樣,平臺程式碼可以任意部署到物理機/容器中,只要環境可以通過TCP連線到對應域名/ip即可.
6)如何做任務排程-任務的自動發現
這裡的任務排程是指:哪些任務需要下線,哪些任務需要第一次上線,哪些任務需要重新上線,
這裡的業務邏輯就是比較mysql裡job的時間戳和yarn叢集裡任務的時間戳
yarn叢集裡任務的時間戳是通過提交時打上Tag標記,就是為了下一次比較用。
這裡有一個細節,就是Athenax的做法是先算出所有要殺死的任務,殺死,再拉起所有要拉起的任務,個人認為這裡不妥,優化之後的做法是:按照任務級別,算出(killaction,startaction),對於單個job來說,二者至少存在1個action,然後以任務為級別進行排程,不再是之前的大一統提交方式,這樣就算單個任務排程異常,也不影響其它任務,做到了任務之間做隔離.
通過時間戳的方式,就不難理解業務一旦修改任務,平臺發現時間戳有變化,就可以自動殺死老任務,拉起新任務,不需要人工操作.
7)拉起任務中的編譯工作
一個job需要拉起時,會實時結合(輸入schema,SQL業務邏輯,輸出schema)進行編譯,
正如hive會翻譯成原生的mapreduce操作,flink sql編譯工作會翻譯成原生的flink jobgraph
這部分是抽取了athenax裡的編譯工作做2開
程式碼如下:
private JobCompilerResult compile(Map inputs, String originSql, ExternalCatalog output, ResourceDTO resourceDTO, ClusterDTO athenaxCluster, Configuration flinkConf) throws Exception { // 解析sql LoggerUtil.info("to be compiled sql : [{}]", originSql); SqlNodeList stmts = (SqlNodeList) new CalciteSqlParser().parse(originSql); Validator validator = new Validator(); validator.validateQuery(stmts); HashMap udfMap = validator.getUserDefinedFunctions(); String selectSql = validator.getStatement().toString(); List additionalResources = validator.getAdditionalResources(); LoggerUtil.info("succeed to parse sql,result is : [{}]", stmts); LoggerUtil.info("udf {}", udfMap); LoggerUtil.info("statement {}", selectSql); LoggerUtil.info("additionalResources {}", additionalResources); // 準備編譯,輸出Flink的JobGraph LoggerUtil.info("begin to create execution environment"); StreamExecutionEnvironment localExecEnv = StreamExecutionEnvironment .createLocalEnvironment(); //非常重要 setFeature(localExecEnv, resourceDTO.getTaskManagerCount() * resourceDTO.getSlotPerTaskManager(), flinkConf); StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(localExecEnv); LoggerUtil.info("tableEnv : {} ", tableEnv); // 註冊UDF,收歸到平臺了,也就是說,只支援平臺開發人員預定義,暫時不支援業務自定義 for (Map.Entry e : udfMap.entrySet()) { final String name = e.getKey(); String clazzName = e.getValue(); LoggerUtil.info("used udf specified by business : {}", name); } registerSDF(athenaxCluster, tableEnv); LoggerUtil.info("all udf registerd , bingo"); // 開始註冊所有的input相關的schema for (Map.Entry e : inputs.entrySet()) { LoggerUtil.info("Registering input catalog {}", e.getKey()); tableEnv.registerExternalCatalog(e.getKey(), e.getValue()); } LoggerUtil.info("all input catalog registerd , bingo"); Table table = tableEnv.sqlQuery(selectSql); LoggerUtil.info("succeed to execute tableEnv.sqlQuery(...)"); LoggerUtil.info("table {}", table); LoggerUtil.info("bingo! input work done completely,let us handle output work now!!!"); // 開始註冊output List outputTables = output.listTables(); for (String t : outputTables) { table.writeToSink(getOutputTable(output.getTable(t))); } LoggerUtil.info("handle output ok"); // 生成JobGraph StreamGraph streamGraph = localExecEnv.getStreamGraph(); JobGraph jobGraph = streamGraph.getJobGraph(); // this is required because the slots are allocated lazily //如果為true就會報錯,然後flink內部就是一直重啟,所以設定為false jobGraph.setAllowQueuedScheduling(false); LoggerUtil.info("create flink job ok {}", jobGraph); JobGraphTool.analyze(jobGraph); // 生成返回結果 JobCompilerResult jobCompilerResult = new JobCompilerResult(); jobCompilerResult.setJobGraph(jobGraph); ArrayList paths = new ArrayList(); Collection values = udfMap.values(); for (String value : values) { paths.add(value); } jobCompilerResult.setAdditionalJars(paths); return jobCompilerResult; }
這部分工作要理解,需要對Calcite有基礎
8)多儲存的支援
平臺在一開始編寫的時候,就考慮到了多儲存支援,雖然很多工是從kafka->計算->Kafka
但是平臺並不只滿足於這一點,因為寫到kafka之後,可能還需要業務再去維護一段程式碼取讀取kafka的訊息進行消費,如果有的業務希望直接能把結果寫到mysql,這個時候就是需要對多儲存進行擴充套件
通過設計和擴充套件機制,平臺開發人員只需要定義儲存相關的類,針對schema定義的解析工作已經再父類中完成,所有儲存類共用,這樣可以靈活支援多儲存,平臺開發人員只需要把重點放在特定儲存性質的支撐即可.
PS:編寫此類儲存類需要對fink job內部的執行機制,否則會造成資源洩露和浪費.
平臺內部已經針對每種型別進行了定義
// 儲存型別 //排名不分先後 public static int STORAGE_REDIS= 1 << 0; //1 public static int STORAGE_MYSQL= 1 << 1; //2 public static int STORAGE_ROCKETMQ = 1 << 2; //4 public static int STORAGE_KAFKA= 1 << 3; //8 public static int STORAGE_PULSAR= 1 << 4; //16 public static int STORAGE_OTHER0= 1 << 5; //32 public static int STORAGE_OTHER1= 1 << 6; //64 public static int STORAGE_OTHER2= 1 << 7; //128 public static int STORAGE_RABBITMQ = 1 << 8; //256 public static int STORAGE_HBASE= 1 << 9; //512 public static int STORAGE_ES= 1 << 10;//1024 public static int STORAGE_HDFS= 1 << 11;//2048
目前支援的情況如下:
輸入:Kafka
輸出:Kafka/Mysql
PS:輸出mysql是基於flink官方的提供類實現的第一版,經過分析原始碼,mysql sink官方這部分程式碼寫得太隨意,差評.
後續當業務有需求時,需要結合zebra做2次開發.畢竟運維不會提供生產環境的ip和埠等資訊,只會提供一個數據源字串標識.這樣更貼合公司內部的執行環境
9)任務提交
一旦生成flink原生的job,就可以準備提交工作
這部分需要對yarn的執行機制比較清楚,比如任務提交到RM上經過哪些狀態變化,ApplicationMaster如何申請資源啟動TaskManager, 具體的job是如何提交給JobManager的,平臺開發人員需要對此有基本的原理掌握,當初也是0基礎開始學習,通過快速翻閱原始碼掌握一些執行機制,方可安心進行平臺開發.
10)其它優化
針對yarn client的引數優化,保證可在一定時間內返回,否則可能一直卡死
針對flink job的平臺級優化,比如禁止快取,讓資訊立刻傳輸到下一個環節(預設100毫秒延遲)
定義flink job的重啟次數,當發生異常時可自行恢復等
11)壓測結果
輸入:本地啟動7個執行緒,傳送速度
每秒傳送到kafka 十幾萬條
接收topic描述
ymm-appmetric-dev-self1 開發環境 partitions 6 replication 1
flink任務描述
2個TaskManager程序 每個程序800M記憶體 每個程序3個執行緒,
並行度 2*3=6
flink計算任務所用sql
SELECT SUM(nested.number) as nestedNumber, hundredFunction(SUM(CAST(`value` AS DOUBLE))) as `sum`, COUNT(`value`) as `count`, AVG(CAST(`value` AS DOUBLE)) as `avg`, MAX(CAST(`value` AS DOUBLE)) as `max`, MIN(CAST(`value` AS DOUBLE)) as `min`, TUMBLE_END(`time`, INTERVAL '3' SECOND)as `time` FROM input.`ymm-appmetric-dev-self1` WHERE metric IS NOT NULL AND `value` IS NOT NULL and `time` IS NOT NULL GROUP BY metric,TUMBLE(`time`, INTERVAL '3' SECOND)
輸出topic
ymm-appmetric-dev-result partitions 3
觀察flink consumer端的消費速度
每個執行緒的消費速度在24000上下浮動,併發度6,每秒可消費kafka訊息14萬+,應該說目前不會碰到效能瓶頸.
其它
本次測試傳送資料條數:4.3 億條
耗時:56分鐘
對於業務開發人員來說,我覺得好處就是
1)不需要懂flink語法(你真的想知道flink的玩法?好吧我承認你很好學)
2)不需要開啟IDE寫java程式碼(你真的想寫Java程式碼?好吧我承認你對Java是真愛)
3)提交一次,不再需要人工介入(你真的想在假期/晚上/過節/過年 擔心任務掛掉?好吧我承認你很敬業)
只需要
1)介面點選操作,定義你的schema
2)寫一段你所擅長的sql
3)點選提交按鈕
4)關閉瀏覽器
5)關閉電腦
其它的就交給平臺吧!
後續:針對平臺來說,後續的主要工作是根據業務需求擴充套件多儲存
如果再長遠,那就是要深度閱讀flink原始碼對平臺進行二次優化