1. 程式人生 > >Flink 中LatencyMarks延遲監控(原始碼分析)

Flink 中LatencyMarks延遲監控(原始碼分析)

流式計算中處理延遲是一個非常重要的監控metric

flink中通過開啟配置   metrics.latency.interval  來開啟latency後就可以在metric中看到askManagerJobMetricGroup/operator_id/operator_subtask_index/latency指標了

如果每一條資料都打上時間監控 輸出時間- 輸入時間,會大量的消耗效能

來看一下flink自帶的延遲監控是怎麼做的

其實也可以想到原理很簡單,就是在source週期性的插入一條特殊的資料LatencyMarker

LatencyMarker初始化的時候會帶上它產生時的時間

每次當task接收到的資料是LatencyMarker的時候他就用 當前時間 - LatencyMarker時間 = lateTime 併發送到指標收集系統

接著繼續把這個LatencyMarker往下游emit

來看一下原始碼是如何實現的

因為是從source加入LatencyMarker先看StreamSource.java

在StreamSource的run 方法中

 初始化了一個LatencyMarksEmitter

 其實就是在processTimeServera中週期性(我們設定的metrics.latency.interval 時長)去向下游emit  當前時間的LatencyMarker

接著來到task接收資料的地方

StreamInputProcessor的processInput方法中

可以看到就是用當前時間 - LatencyMarker,然後就往report傳送了,然後emit

而sink運算元的唯一區別就是

區別就是sink沒有emit  LatencyMarker 因為是最後一個運算元了嘛

這裡就講完了

 

注意的點是:

   其實可以看到flink中的LatencyMarker是沒有走使用者程式碼邏輯的,也就是說統計出來的延遲時間並不是端到端的,而是除了使用者邏輯處理外的延遲,

   因為LatencyMarker和資料的處理是同步處理的,雖然監控延遲中沒有過使用者邏輯程式碼(正常資料接收以後使用者程式碼處理然後emit,LatencyMarker接收後直接emit)

           但是就像馬路一樣,整個馬路擁塞了延遲高了,那還是會使這個指標值越來越大

   可能這樣的設計是考慮到LatencyMarker如果也走使用者處理邏輯的話會消耗過多的效能吧,特別是採集頻繁的時候