Flink 1.4.2 版本踩過的坑
阿新 • • 發佈:2018-11-11
0x1 摘要
最近業務要實時統計半小時維度的UV、PV資料,經過調研準備用Flink時間窗來實現,主要是Flink對eventTime
的支援,可以做到更精準的統計,由於第一次嘗試使用Flink,所以過程中遇到不少問題,記錄下來方便後續查閱。
0x2 執行計劃輸出JSON問題
Flink對執行計劃分析提供了支援,可以通過程式碼將執行計劃打出來,並利用官網提供的圖生成工具可以方便分析,通過env.getExecutionPlan()
方法可以獲取JSON格式的執行計劃,將JSON字串拷貝到http://flink.apache.org/visualizer/
網站文字框就可以檢視。
但我們在專案中呼叫env.getExecutionPlan()
Caused by: java.lang.IllegalArgumentException: Comparison method violates its general contract! at java.util.TimSort.mergeLo(TimSort.java:777) at java.util.TimSort.mergeAt(TimSort.java:514) at java.util.TimSort.mergeForceCollapse(TimSort.java:457) at java.util.TimSort.sort(TimSort.java:254) at java.util.Arrays.sort(Arrays.java:1512) at java.util.ArrayList.sort(ArrayList.java:1454) at java.util.Collections.sort(Collections.java:175) at org.apache.flink.streaming.api.graph.JSONGenerator.getJSON(JSONGenerator.java:61) at org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:663) ... 2 more
通過異常資訊可以知道問題發生在TimSort
排序演算法,意思就是比較方法違反約束,具體約束規範大家可以自行網上查閱:自反性、傳遞性、對稱性。
我們來看一下Flink的原始碼,只要看JSONGenerator
類61
行就可以:
public String getJSON() throws JSONException { JSONObject json = new JSONObject(); JSONArray nodes = new JSONArray(); json.put("nodes", nodes); List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs()); Collections.sort(operatorIDs, new Comparator<Integer>() { @Override public int compare(Integer o1, Integer o2) { // put sinks at the back if (streamGraph.getSinkIDs().contains(o1)) { return 1; } else if (streamGraph.getSinkIDs().contains(o2)) { return -1; } else { return o1 - o2; } } }); visit(nodes, operatorIDs, new HashMap<Integer, Integer>()); return json.toString(); }
從原始碼可以看在對操作ID做排序時Flink自己實現compare
方法,具體這個方法的實際意義不是很明白,有明白的賜教一下。後來通過網上查閱已經有人提過此issues,地址:https://issues.apache.org/jira/browse/FLINK-8498
,但狀態是關閉的,也沒有回覆什麼時候解決,但我們通過檢視Flink GitHub原始碼發現,此處實現已經發生變更,原始碼地址https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
新實現原始碼: