Flink 中LatencyMarks延遲監控(原始碼分析)
流式計算中處理延遲是一個非常重要的監控metric
flink中通過開啟配置 metrics.latency.interval 來開啟latency後就可以在metric中看到askManagerJobMetricGroup/operator_id/operator_subtask_index/latency指標了
如果每一條資料都打上時間監控 輸出時間- 輸入時間,會大量的消耗效能
來看一下flink自帶的延遲監控是怎麼做的
其實也可以想到原理很簡單,就是在source週期性的插入一條特殊的資料LatencyMarker
LatencyMarker初始化的時候會帶上它產生時的時間
每次當task接收到的資料是LatencyMarker的時候他就用 當前時間 - LatencyMarker時間 = lateTime 併發送到指標收集系統
接著繼續把這個LatencyMarker往下游emit
來看一下原始碼是如何實現的
因為是從source加入LatencyMarker先看StreamSource.java
在StreamSource的run 方法中
初始化了一個LatencyMarksEmitter
其實就是在processTimeServera中週期性(我們設定的metrics.latency.interval 時長)去向下游emit 當前時間的LatencyMarker
接著來到task接收資料的地方
StreamInputProcessor的processInput方法中
可以看到就是用當前時間 - LatencyMarker,然後就往report傳送了,然後emit
而sink運算元的唯一區別就是
區別就是sink沒有emit LatencyMarker 因為是最後一個運算元了嘛
這裡就講完了
注意的點是:
其實可以看到flink中的LatencyMarker是沒有走使用者程式碼邏輯的,也就是說統計出來的延遲時間並不是端到端的,而是除了使用者邏輯處理外的延遲,
因為LatencyMarker和資料的處理是同步處理的,雖然監控延遲中沒有過使用者邏輯程式碼(正常資料接收以後使用者程式碼處理然後emit,LatencyMarker接收後直接emit)
但是就像馬路一樣,整個馬路擁塞了延遲高了,那還是會使這個指標值越來越大
可能這樣的設計是考慮到LatencyMarker如果也走使用者處理邏輯的話會消耗過多的效能吧,特別是採集頻繁的時候