1. 程式人生 > >Flink 1.4.2 版本踩過的坑

Flink 1.4.2 版本踩過的坑

0x1 摘要

最近業務要實時統計半小時維度的UV、PV資料,經過調研準備用Flink時間窗來實現,主要是Flink對eventTime的支援,可以做到更精準的統計,由於第一次嘗試使用Flink,所以過程中遇到不少問題,記錄下來方便後續查閱。

0x2 執行計劃輸出JSON問題

Flink對執行計劃分析提供了支援,可以通過程式碼將執行計劃打出來,並利用官網提供的圖生成工具可以方便分析,通過env.getExecutionPlan()方法可以獲取JSON格式的執行計劃,將JSON字串拷貝到http://flink.apache.org/visualizer/網站文字框就可以檢視。
但我們在專案中呼叫env.getExecutionPlan()

方法後報以下異常資訊:

Caused by: java.lang.IllegalArgumentException: Comparison method violates its general contract!
    at java.util.TimSort.mergeLo(TimSort.java:777)
    at java.util.TimSort.mergeAt(TimSort.java:514)
    at java.util.TimSort.mergeForceCollapse(TimSort.java:457)
    at java.util.TimSort.sort(TimSort.java:254)
    at java.util.Arrays.sort(Arrays.java:1512)
    at java.util.ArrayList.sort(ArrayList.java:1454)
    at java.util.Collections.sort(Collections.java:175)
    at org.apache.flink.streaming.api.graph.JSONGenerator.getJSON(JSONGenerator.java:61)
    at org.apache.flink.streaming.api.graph.StreamGraph.getStreamingPlanAsJSON(StreamGraph.java:663)
    ... 2 more

通過異常資訊可以知道問題發生在TimSort排序演算法,意思就是比較方法違反約束,具體約束規範大家可以自行網上查閱:自反性、傳遞性、對稱性。
我們來看一下Flink的原始碼,只要看JSONGenerator61行就可以:

public String getJSON() throws JSONException {
    JSONObject json = new JSONObject();
    JSONArray nodes = new JSONArray();
    json.put("nodes", nodes);
    List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
    Collections.sort(operatorIDs, new Comparator<Integer>() {
        @Override
        public int compare(Integer o1, Integer o2) {
            // put sinks at the back
            if (streamGraph.getSinkIDs().contains(o1)) {
                return 1;
            } else if (streamGraph.getSinkIDs().contains(o2)) {
                return -1;
            } else {
                return o1 - o2;
            }
        }
    });
    visit(nodes, operatorIDs, new HashMap<Integer, Integer>());
    return json.toString();
}

從原始碼可以看在對操作ID做排序時Flink自己實現compare方法,具體這個方法的實際意義不是很明白,有明白的賜教一下。後來通過網上查閱已經有人提過此issues,地址:https://issues.apache.org/jira/browse/FLINK-8498,但狀態是關閉的,也沒有回覆什麼時候解決,但我們通過檢視Flink GitHub原始碼發現,此處實現已經發生變更,原始碼地址https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
新實現原始碼:

原文連結