1. 程式人生 > >年薪50萬前阿里工程師分享如何構建flink sql平臺

年薪50萬前阿里工程師分享如何構建flink sql平臺

我們都知道,離線計算有Hive,使用過的知道,需要先定義一個schema,比如針對HDFS這種儲存對標mysql定義一個schema,schema的本質是什麼?主要描述下面這些資訊

1)當前儲存的物理位置的描述

2)資料格式的組成形式

然後Hive可以讓使用者定義一段sql,針對上面定義的schema進行,sql的本質是什麼,是業務邏輯的描述。然後Hive內部會將這段sql進行編譯轉化為原生的底層MapReduce操作,通過這種方式,遮蔽底層技術原理,讓業務開發人員集中精力在schema和sql業務邏輯上,flink sql平臺也正是做同樣的事情。

一開始經過跟上海同事的討論,選擇Uber的Athenax作為技術選型,通過翻閱原始碼,發現還是有很多不完善的地方,比如配置檔案採用yaml,如果做多叢集排程,平臺程式碼優化,多儲存擴充套件機制,都沒有考慮得很清楚,所以程式碼拿過來之後基本上可以說按照對yarn和flink的理解重新寫了一遍。

大致的工作流程如圖所示:

 

 

 

簡單解釋一下:

1)業務定義job

2)提交到web伺服器,存到mysql中

3)flink平臺程序定時掃描mysql,探測到udf變化,按需實時編譯class,class常駐記憶體

4)同時打包推送到hdfs

5)flink平臺程序定時掃描mysql,探測到job定義,並從yarn叢集獲取當前執行狀態的job的report

比較時間戳,決定哪些任務要殺死,啟動

6)flink提交到yarn叢集的任務,yarn會從hdfs拉取job描述裡的jar包,啟動這個flink job

然後步驟3,4,5,6 重複執行

下面是平臺程式碼的思路

1)通過springboot提供HTTP API,提供多叢集定義,儲存在mysql裡

一個叢集需要定義的資訊點如下:

 

 

 

2)提供HTTP API讓業務進行Job定義

這裡的Job定義包含3個方面:job的輸出輸出的schema定義,job的業務邏輯定義(sql),job需要的yarn資源定義,具體來說如下所示:

Job定義

 

 

 

文中的sql定義

SELECT SUM(nested.number) as nestedNumber,
hundredFunction(SUM(CAST(`value` AS DOUBLE))) as `sum`,
COUNT(`value`) as `count`,
AVG(CAST(`value` AS DOUBLE)) as `avg`,
MAX(CAST(`value` AS DOUBLE)) as `max`,
MIN(CAST(`value` AS DOUBLE)) as `min`,
TUMBLE_END(`time`, INTERVAL '3' SECOND) as `time` 
FROM input.`ymm-appmetric-dev-self1` 
WHERE metric IS NOT NULL AND `value` IS NOT NULL 
and `time` IS NOT NULL
GROUP BY metric,TUMBLE(`time`, INTERVAL '3' SECOND)

輸入/輸出schema定義,以kafka為例,輸入和輸出格式差不多

{
"brokerAddress":"略",
"topic":"dev-metric",
"schemas":[
 {"key":"sum","type":"double"},
 {"key":"count","type":"int"},
 {"key":"avg","type":"double"},
 {"key":"max","type":"double"},
 {"key":"min","type":"double"},
 {"key":"time","type":"timestamp"},
 {"key":"nestedNumber","type":"int"}
 ]
}

對於業務來說,“開啟IDE->瞭解flink語法寫java程式碼->打包成jar->提交到yarn叢集”這一環節省去了,直接開啟介面,點選按鈕定義sql,寫一段業務邏輯sql,提交此業務到mysql,關閉瀏覽器即可.由平臺進行排程(秒級),永遠不用擔心這個任務某一天掛了怎麼辦,平臺會自動發現自動拉起.提交一次永遠不需要再人工干預,除非邏輯發生變化,在邏輯發生變化時也簡單,開啟任務修改再提交,關閉瀏覽器,結束,平臺會發現job變化殺死老任務拉起新任務.

寫到這裡給大資料推薦一個大資料學習群:774 666 256 ,裡面有大資料學習資料,學習錄屏,視訊教程

下面講一下平臺內部是如何實現的

3)叢集自動發現

如果平臺維護方想增加一個叢集,通過介面直接定義一個存在mysql即可,後臺執行緒會自動發現,為每個叢集建立一個執行緒,多節點情況下,整個環境中某個特定叢集的多個執行緒通過ZK進行搶佔決定哪個執行緒當前為這個叢集服務.

增加JVM關閉鉤子,在JVM退出時,主動關閉ZK客戶端,釋放ZK上的臨時節點.

4)UDF的支援&自動發現

平臺支援平臺級UDF的定義,由平臺人員進行維護,平臺人員編寫指令碼,通過base64編碼存在mysql裡,歸屬到某個叢集,這個叢集的掃描執行緒發現有必要進行編譯時,實時編譯成class常駐記憶體,同時,打包成jar包上傳到遠端HDFS,後面會將此路徑放入到具體job的classpath路徑下. job就可以正確發現UDF.

當UDF沒有發生變化時,執行緒不會編譯,而是複用上一次的編譯結果.

5)程式可以任意部署,不依賴大資料環境

程式本身不依賴大資料環境的配置,具體是指不需要依賴當前宿主機.../etc/hadoop/*.xml檔案

通過讀取cluster的配置,動態生成XML配置,再生成HDFS/YARN的客戶端client,這樣,平臺程式碼可以任意部署到物理機/容器中,只要環境可以通過TCP連線到對應域名/ip即可.

6)如何做任務排程-任務的自動發現

這裡的任務排程是指:哪些任務需要下線,哪些任務需要第一次上線,哪些任務需要重新上線,

這裡的業務邏輯就是比較mysql裡job的時間戳和yarn叢集裡任務的時間戳

yarn叢集裡任務的時間戳是通過提交時打上Tag標記,就是為了下一次比較用。

這裡有一個細節,就是Athenax的做法是先算出所有要殺死的任務,殺死,再拉起所有要拉起的任務,個人認為這裡不妥,優化之後的做法是:按照任務級別,算出(killaction,startaction),對於單個job來說,二者至少存在1個action,然後以任務為級別進行排程,不再是之前的大一統提交方式,這樣就算單個任務排程異常,也不影響其它任務,做到了任務之間做隔離.

通過時間戳的方式,就不難理解業務一旦修改任務,平臺發現時間戳有變化,就可以自動殺死老任務,拉起新任務,不需要人工操作.

7)拉起任務中的編譯工作

一個job需要拉起時,會實時結合(輸入schema,SQL業務邏輯,輸出schema)進行編譯,

正如hive會翻譯成原生的mapreduce操作,flink sql編譯工作會翻譯成原生的flink jobgraph

這部分是抽取了athenax裡的編譯工作做2開

程式碼如下:

private JobCompilerResult compile(Map inputs, String originSql,
 ExternalCatalog output, ResourceDTO resourceDTO,
 ClusterDTO athenaxCluster,
 Configuration flinkConf) throws Exception {
 // 解析sql
 LoggerUtil.info("to be compiled sql : [{}]", originSql);
 SqlNodeList stmts = (SqlNodeList) new CalciteSqlParser().parse(originSql);
 Validator validator = new Validator();
 validator.validateQuery(stmts);
 HashMap udfMap = validator.getUserDefinedFunctions();
 String selectSql = validator.getStatement().toString();
 List additionalResources = validator.getAdditionalResources();
 LoggerUtil.info("succeed to parse sql,result is : [{}]", stmts);
 LoggerUtil.info("udf {}", udfMap);
 LoggerUtil.info("statement {}", selectSql);
 LoggerUtil.info("additionalResources {}", additionalResources);
 // 準備編譯,輸出Flink的JobGraph
 LoggerUtil.info("begin to create execution environment");
 StreamExecutionEnvironment localExecEnv = StreamExecutionEnvironment
 .createLocalEnvironment();
 //非常重要
 setFeature(localExecEnv,
 resourceDTO.getTaskManagerCount() * resourceDTO.getSlotPerTaskManager(), flinkConf);
 StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(localExecEnv);
 LoggerUtil.info("tableEnv : {} ", tableEnv);
 // 註冊UDF,收歸到平臺了,也就是說,只支援平臺開發人員預定義,暫時不支援業務自定義
 for (Map.Entry e : udfMap.entrySet()) {
 final String name = e.getKey();
 String clazzName = e.getValue();
 LoggerUtil.info("used udf specified by business : {}", name);
 }
 registerSDF(athenaxCluster, tableEnv);
 LoggerUtil.info("all udf registerd , bingo");
 // 開始註冊所有的input相關的schema
 for (Map.Entry e : inputs.entrySet()) {
 LoggerUtil.info("Registering input catalog {}", e.getKey());
 tableEnv.registerExternalCatalog(e.getKey(), e.getValue());
 }
 LoggerUtil.info("all input catalog registerd , bingo");
 Table table = tableEnv.sqlQuery(selectSql);
 LoggerUtil.info("succeed to execute tableEnv.sqlQuery(...)");
 LoggerUtil.info("table {}", table);
 LoggerUtil.info("bingo! input work done completely,let us handle output work now!!!");
 // 開始註冊output
 List outputTables = output.listTables();
 for (String t : outputTables) {
 table.writeToSink(getOutputTable(output.getTable(t)));
 }
 LoggerUtil.info("handle output ok");
 // 生成JobGraph
 StreamGraph streamGraph = localExecEnv.getStreamGraph();
 JobGraph jobGraph = streamGraph.getJobGraph();
 // this is required because the slots are allocated lazily
 //如果為true就會報錯,然後flink內部就是一直重啟,所以設定為false
 jobGraph.setAllowQueuedScheduling(false);
 LoggerUtil.info("create flink job ok {}", jobGraph);
 JobGraphTool.analyze(jobGraph);
 // 生成返回結果
 JobCompilerResult jobCompilerResult = new JobCompilerResult();
 jobCompilerResult.setJobGraph(jobGraph);
 ArrayList paths = new ArrayList();
 Collection values = udfMap.values();
 for (String value : values) {
 paths.add(value);
 }
 jobCompilerResult.setAdditionalJars(paths);
 return jobCompilerResult;
 }

這部分工作要理解,需要對Calcite有基礎

8)多儲存的支援

平臺在一開始編寫的時候,就考慮到了多儲存支援,雖然很多工是從kafka->計算->Kafka

但是平臺並不只滿足於這一點,因為寫到kafka之後,可能還需要業務再去維護一段程式碼取讀取kafka的訊息進行消費,如果有的業務希望直接能把結果寫到mysql,這個時候就是需要對多儲存進行擴充套件

通過設計和擴充套件機制,平臺開發人員只需要定義儲存相關的類,針對schema定義的解析工作已經再父類中完成,所有儲存類共用,這樣可以靈活支援多儲存,平臺開發人員只需要把重點放在特定儲存性質的支撐即可.

PS:編寫此類儲存類需要對fink job內部的執行機制,否則會造成資源洩露和浪費.

平臺內部已經針對每種型別進行了定義

 // 儲存型別
 //排名不分先後 
 public static int STORAGE_REDIS = 1 << 0; //1
 public static int STORAGE_MYSQL = 1 << 1; //2
 public static int STORAGE_ROCKETMQ = 1 << 2; //4
 public static int STORAGE_KAFKA = 1 << 3; //8
 public static int STORAGE_PULSAR = 1 << 4; //16
 public static int STORAGE_OTHER0 = 1 << 5; //32
 public static int STORAGE_OTHER1 = 1 << 6; //64
 public static int STORAGE_OTHER2 = 1 << 7; //128
 public static int STORAGE_RABBITMQ = 1 << 8; //256
 public static int STORAGE_HBASE = 1 << 9; //512
 public static int STORAGE_ES = 1 << 10;//1024
 public static int STORAGE_HDFS = 1 << 11;//2048

目前支援的情況如下:

輸入:Kafka

輸出:Kafka/Mysql

PS:輸出mysql是基於flink官方的提供類實現的第一版,經過分析原始碼,mysql sink官方這部分程式碼寫得太隨意,差評.

後續當業務有需求時,需要結合zebra做2次開發.畢竟運維不會提供生產環境的ip和埠等資訊,只會提供一個數據源字串標識.這樣更貼合公司內部的執行環境

9)任務提交

一旦生成flink原生的job,就可以準備提交工作

這部分需要對yarn的執行機制比較清楚,比如任務提交到RM上經過哪些狀態變化,ApplicationMaster如何申請資源啟動TaskManager, 具體的job是如何提交給JobManager的,平臺開發人員需要對此有基本的原理掌握,當初也是0基礎開始學習,通過快速翻閱原始碼掌握一些執行機制,方可安心進行平臺開發.

10)其它優化

針對yarn client的引數優化,保證可在一定時間內返回,否則可能一直卡死

針對flink job的平臺級優化,比如禁止快取,讓資訊立刻傳輸到下一個環節(預設100毫秒延遲)

定義flink job的重啟次數,當發生異常時可自行恢復等

11)壓測結果

輸入:本地啟動7個執行緒,傳送速度

 

 

 

每秒傳送到kafka 十幾萬條

接收topic描述

ymm-appmetric-dev-self1 開發環境 partitions 6 replication 1

flink任務描述

2個TaskManager程序 每個程序800M記憶體 每個程序3個執行緒,

並行度 2*3=6

flink計算任務所用sql

SELECT SUM(nested.number) as nestedNumber,
hundredFunction(SUM(CAST(`value` AS DOUBLE))) as `sum`,
COUNT(`value`) as `count`,
AVG(CAST(`value` AS DOUBLE)) as `avg`,
MAX(CAST(`value` AS DOUBLE)) as `max`,
MIN(CAST(`value` AS DOUBLE)) as `min`,
TUMBLE_END(`time`, INTERVAL '3' SECOND) as `time` 
FROM input.`ymm-appmetric-dev-self1` 
WHERE metric IS NOT NULL AND `value` IS NOT NULL and `time` IS NOT NULL 
GROUP BY metric, TUMBLE(`time`, INTERVAL '3' SECOND)

輸出topic

ymm-appmetric-dev-result partitions 3

觀察flink consumer端的消費速度

 

 

 

每個執行緒的消費速度在24000上下浮動,併發度6,每秒可消費kafka訊息14萬+,應該說目前不會碰到效能瓶頸.

其它

本次測試傳送資料條數:4.3 億條

耗時:56分鐘

對於業務開發人員來說,我覺得好處就是

1)不需要懂flink語法(你真的想知道flink的玩法?好吧我承認你很好學)

2)不需要開啟IDE寫java程式碼(你真的想寫Java程式碼?好吧我承認你對Java是真愛)

3)提交一次,不再需要人工介入(你真的想在假期/晚上/過節/過年 擔心任務掛掉?好吧我承認你很敬業)

只需要

1)介面點選操作,定義你的schema

2)寫一段你所擅長的sql

3)點選提交按鈕

4)關閉瀏覽器

5)關閉電腦

其它的就交給平臺吧!