年薪50萬前阿里工程師分享如何構建flink sql平臺
我們都知道,離線計算有Hive,使用過的知道,需要先定義一個schema,比如針對HDFS這種儲存對標mysql定義一個schema,schema的本質是什麼?主要描述下面這些資訊
1)當前儲存的物理位置的描述
2)資料格式的組成形式
然後Hive可以讓使用者定義一段sql,針對上面定義的schema進行,sql的本質是什麼,是業務邏輯的描述。然後Hive內部會將這段sql進行編譯轉化為原生的底層MapReduce操作,通過這種方式,遮蔽底層技術原理,讓業務開發人員集中精力在schema和sql業務邏輯上,flink sql平臺也正是做同樣的事情。
一開始經過跟上海同事的討論,選擇Uber的Athenax作為技術選型,通過翻閱原始碼,發現還是有很多不完善的地方,比如配置檔案採用yaml,如果做多叢集排程,平臺程式碼優化,多儲存擴充套件機制,都沒有考慮得很清楚,所以程式碼拿過來之後基本上可以說按照對yarn和flink的理解重新寫了一遍。
大致的工作流程如圖所示:
簡單解釋一下:
1)業務定義job
2)提交到web伺服器,存到mysql中
3)flink平臺程序定時掃描mysql,探測到udf變化,按需實時編譯class,class常駐記憶體
4)同時打包推送到hdfs
5)flink平臺程序定時掃描mysql,探測到job定義,並從yarn叢集獲取當前執行狀態的job的report
比較時間戳,決定哪些任務要殺死,啟動
6)flink提交到yarn叢集的任務,yarn會從hdfs拉取job描述裡的jar包,啟動這個flink job
然後步驟3,4,5,6 重複執行
下面是平臺程式碼的思路
1)通過springboot提供HTTP API,提供多叢集定義,儲存在mysql裡
一個叢集需要定義的資訊點如下:
2)提供HTTP API讓業務進行Job定義
這裡的Job定義包含3個方面:job的輸出輸出的schema定義,job的業務邏輯定義(sql),job需要的yarn資源定義,具體來說如下所示:
Job定義
文中的sql定義
SELECT SUM(nested.number) as nestedNumber, hundredFunction(SUM(CAST(`value` AS DOUBLE))) as `sum`, COUNT(`value`) as `count`, AVG(CAST(`value` AS DOUBLE)) as `avg`, MAX(CAST(`value` AS DOUBLE)) as `max`, MIN(CAST(`value` AS DOUBLE)) as `min`, TUMBLE_END(`time`, INTERVAL '3' SECOND) as `time` FROM input.`ymm-appmetric-dev-self1` WHERE metric IS NOT NULL AND `value` IS NOT NULL and `time` IS NOT NULL GROUP BY metric,TUMBLE(`time`, INTERVAL '3' SECOND)
輸入/輸出schema定義,以kafka為例,輸入和輸出格式差不多
{
"brokerAddress":"略",
"topic":"dev-metric",
"schemas":[
{"key":"sum","type":"double"},
{"key":"count","type":"int"},
{"key":"avg","type":"double"},
{"key":"max","type":"double"},
{"key":"min","type":"double"},
{"key":"time","type":"timestamp"},
{"key":"nestedNumber","type":"int"}
]
}
對於業務來說,“開啟IDE->瞭解flink語法寫java程式碼->打包成jar->提交到yarn叢集”這一環節省去了,直接開啟介面,點選按鈕定義sql,寫一段業務邏輯sql,提交此業務到mysql,關閉瀏覽器即可.由平臺進行排程(秒級),永遠不用擔心這個任務某一天掛了怎麼辦,平臺會自動發現自動拉起.提交一次永遠不需要再人工干預,除非邏輯發生變化,在邏輯發生變化時也簡單,開啟任務修改再提交,關閉瀏覽器,結束,平臺會發現job變化殺死老任務拉起新任務.
寫到這裡給大資料推薦一個大資料學習群:774 666 256 ,裡面有大資料學習資料,學習錄屏,視訊教程
下面講一下平臺內部是如何實現的
3)叢集自動發現
如果平臺維護方想增加一個叢集,通過介面直接定義一個存在mysql即可,後臺執行緒會自動發現,為每個叢集建立一個執行緒,多節點情況下,整個環境中某個特定叢集的多個執行緒通過ZK進行搶佔決定哪個執行緒當前為這個叢集服務.
增加JVM關閉鉤子,在JVM退出時,主動關閉ZK客戶端,釋放ZK上的臨時節點.
4)UDF的支援&自動發現
平臺支援平臺級UDF的定義,由平臺人員進行維護,平臺人員編寫指令碼,通過base64編碼存在mysql裡,歸屬到某個叢集,這個叢集的掃描執行緒發現有必要進行編譯時,實時編譯成class常駐記憶體,同時,打包成jar包上傳到遠端HDFS,後面會將此路徑放入到具體job的classpath路徑下. job就可以正確發現UDF.
當UDF沒有發生變化時,執行緒不會編譯,而是複用上一次的編譯結果.
5)程式可以任意部署,不依賴大資料環境
程式本身不依賴大資料環境的配置,具體是指不需要依賴當前宿主機.../etc/hadoop/*.xml檔案
通過讀取cluster的配置,動態生成XML配置,再生成HDFS/YARN的客戶端client,這樣,平臺程式碼可以任意部署到物理機/容器中,只要環境可以通過TCP連線到對應域名/ip即可.
6)如何做任務排程-任務的自動發現
這裡的任務排程是指:哪些任務需要下線,哪些任務需要第一次上線,哪些任務需要重新上線,
這裡的業務邏輯就是比較mysql裡job的時間戳和yarn叢集裡任務的時間戳
yarn叢集裡任務的時間戳是通過提交時打上Tag標記,就是為了下一次比較用。
這裡有一個細節,就是Athenax的做法是先算出所有要殺死的任務,殺死,再拉起所有要拉起的任務,個人認為這裡不妥,優化之後的做法是:按照任務級別,算出(killaction,startaction),對於單個job來說,二者至少存在1個action,然後以任務為級別進行排程,不再是之前的大一統提交方式,這樣就算單個任務排程異常,也不影響其它任務,做到了任務之間做隔離.
通過時間戳的方式,就不難理解業務一旦修改任務,平臺發現時間戳有變化,就可以自動殺死老任務,拉起新任務,不需要人工操作.
7)拉起任務中的編譯工作
一個job需要拉起時,會實時結合(輸入schema,SQL業務邏輯,輸出schema)進行編譯,
正如hive會翻譯成原生的mapreduce操作,flink sql編譯工作會翻譯成原生的flink jobgraph
這部分是抽取了athenax裡的編譯工作做2開
程式碼如下:
private JobCompilerResult compile(Map inputs, String originSql,
ExternalCatalog output, ResourceDTO resourceDTO,
ClusterDTO athenaxCluster,
Configuration flinkConf) throws Exception {
// 解析sql
LoggerUtil.info("to be compiled sql : [{}]", originSql);
SqlNodeList stmts = (SqlNodeList) new CalciteSqlParser().parse(originSql);
Validator validator = new Validator();
validator.validateQuery(stmts);
HashMap udfMap = validator.getUserDefinedFunctions();
String selectSql = validator.getStatement().toString();
List additionalResources = validator.getAdditionalResources();
LoggerUtil.info("succeed to parse sql,result is : [{}]", stmts);
LoggerUtil.info("udf {}", udfMap);
LoggerUtil.info("statement {}", selectSql);
LoggerUtil.info("additionalResources {}", additionalResources);
// 準備編譯,輸出Flink的JobGraph
LoggerUtil.info("begin to create execution environment");
StreamExecutionEnvironment localExecEnv = StreamExecutionEnvironment
.createLocalEnvironment();
//非常重要
setFeature(localExecEnv,
resourceDTO.getTaskManagerCount() * resourceDTO.getSlotPerTaskManager(), flinkConf);
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(localExecEnv);
LoggerUtil.info("tableEnv : {} ", tableEnv);
// 註冊UDF,收歸到平臺了,也就是說,只支援平臺開發人員預定義,暫時不支援業務自定義
for (Map.Entry e : udfMap.entrySet()) {
final String name = e.getKey();
String clazzName = e.getValue();
LoggerUtil.info("used udf specified by business : {}", name);
}
registerSDF(athenaxCluster, tableEnv);
LoggerUtil.info("all udf registerd , bingo");
// 開始註冊所有的input相關的schema
for (Map.Entry e : inputs.entrySet()) {
LoggerUtil.info("Registering input catalog {}", e.getKey());
tableEnv.registerExternalCatalog(e.getKey(), e.getValue());
}
LoggerUtil.info("all input catalog registerd , bingo");
Table table = tableEnv.sqlQuery(selectSql);
LoggerUtil.info("succeed to execute tableEnv.sqlQuery(...)");
LoggerUtil.info("table {}", table);
LoggerUtil.info("bingo! input work done completely,let us handle output work now!!!");
// 開始註冊output
List outputTables = output.listTables();
for (String t : outputTables) {
table.writeToSink(getOutputTable(output.getTable(t)));
}
LoggerUtil.info("handle output ok");
// 生成JobGraph
StreamGraph streamGraph = localExecEnv.getStreamGraph();
JobGraph jobGraph = streamGraph.getJobGraph();
// this is required because the slots are allocated lazily
//如果為true就會報錯,然後flink內部就是一直重啟,所以設定為false
jobGraph.setAllowQueuedScheduling(false);
LoggerUtil.info("create flink job ok {}", jobGraph);
JobGraphTool.analyze(jobGraph);
// 生成返回結果
JobCompilerResult jobCompilerResult = new JobCompilerResult();
jobCompilerResult.setJobGraph(jobGraph);
ArrayList paths = new ArrayList();
Collection values = udfMap.values();
for (String value : values) {
paths.add(value);
}
jobCompilerResult.setAdditionalJars(paths);
return jobCompilerResult;
}
這部分工作要理解,需要對Calcite有基礎
8)多儲存的支援
平臺在一開始編寫的時候,就考慮到了多儲存支援,雖然很多工是從kafka->計算->Kafka
但是平臺並不只滿足於這一點,因為寫到kafka之後,可能還需要業務再去維護一段程式碼取讀取kafka的訊息進行消費,如果有的業務希望直接能把結果寫到mysql,這個時候就是需要對多儲存進行擴充套件
通過設計和擴充套件機制,平臺開發人員只需要定義儲存相關的類,針對schema定義的解析工作已經再父類中完成,所有儲存類共用,這樣可以靈活支援多儲存,平臺開發人員只需要把重點放在特定儲存性質的支撐即可.
PS:編寫此類儲存類需要對fink job內部的執行機制,否則會造成資源洩露和浪費.
平臺內部已經針對每種型別進行了定義
// 儲存型別
//排名不分先後
public static int STORAGE_REDIS = 1 << 0; //1
public static int STORAGE_MYSQL = 1 << 1; //2
public static int STORAGE_ROCKETMQ = 1 << 2; //4
public static int STORAGE_KAFKA = 1 << 3; //8
public static int STORAGE_PULSAR = 1 << 4; //16
public static int STORAGE_OTHER0 = 1 << 5; //32
public static int STORAGE_OTHER1 = 1 << 6; //64
public static int STORAGE_OTHER2 = 1 << 7; //128
public static int STORAGE_RABBITMQ = 1 << 8; //256
public static int STORAGE_HBASE = 1 << 9; //512
public static int STORAGE_ES = 1 << 10;//1024
public static int STORAGE_HDFS = 1 << 11;//2048
目前支援的情況如下:
輸入:Kafka
輸出:Kafka/Mysql
PS:輸出mysql是基於flink官方的提供類實現的第一版,經過分析原始碼,mysql sink官方這部分程式碼寫得太隨意,差評.
後續當業務有需求時,需要結合zebra做2次開發.畢竟運維不會提供生產環境的ip和埠等資訊,只會提供一個數據源字串標識.這樣更貼合公司內部的執行環境
9)任務提交
一旦生成flink原生的job,就可以準備提交工作
這部分需要對yarn的執行機制比較清楚,比如任務提交到RM上經過哪些狀態變化,ApplicationMaster如何申請資源啟動TaskManager, 具體的job是如何提交給JobManager的,平臺開發人員需要對此有基本的原理掌握,當初也是0基礎開始學習,通過快速翻閱原始碼掌握一些執行機制,方可安心進行平臺開發.
10)其它優化
針對yarn client的引數優化,保證可在一定時間內返回,否則可能一直卡死
針對flink job的平臺級優化,比如禁止快取,讓資訊立刻傳輸到下一個環節(預設100毫秒延遲)
定義flink job的重啟次數,當發生異常時可自行恢復等
11)壓測結果
輸入:本地啟動7個執行緒,傳送速度
每秒傳送到kafka 十幾萬條
接收topic描述
ymm-appmetric-dev-self1 開發環境 partitions 6 replication 1
flink任務描述
2個TaskManager程序 每個程序800M記憶體 每個程序3個執行緒,
並行度 2*3=6
flink計算任務所用sql
SELECT SUM(nested.number) as nestedNumber,
hundredFunction(SUM(CAST(`value` AS DOUBLE))) as `sum`,
COUNT(`value`) as `count`,
AVG(CAST(`value` AS DOUBLE)) as `avg`,
MAX(CAST(`value` AS DOUBLE)) as `max`,
MIN(CAST(`value` AS DOUBLE)) as `min`,
TUMBLE_END(`time`, INTERVAL '3' SECOND) as `time`
FROM input.`ymm-appmetric-dev-self1`
WHERE metric IS NOT NULL AND `value` IS NOT NULL and `time` IS NOT NULL
GROUP BY metric, TUMBLE(`time`, INTERVAL '3' SECOND)
輸出topic
ymm-appmetric-dev-result partitions 3
觀察flink consumer端的消費速度
每個執行緒的消費速度在24000上下浮動,併發度6,每秒可消費kafka訊息14萬+,應該說目前不會碰到效能瓶頸.
其它
本次測試傳送資料條數:4.3 億條
耗時:56分鐘
對於業務開發人員來說,我覺得好處就是
1)不需要懂flink語法(你真的想知道flink的玩法?好吧我承認你很好學)
2)不需要開啟IDE寫java程式碼(你真的想寫Java程式碼?好吧我承認你對Java是真愛)
3)提交一次,不再需要人工介入(你真的想在假期/晚上/過節/過年 擔心任務掛掉?好吧我承認你很敬業)
只需要
1)介面點選操作,定義你的schema
2)寫一段你所擅長的sql
3)點選提交按鈕
4)關閉瀏覽器
5)關閉電腦
其它的就交給平臺吧!