Kafka 流量監控的原理及實現
工程能力
作為一個優秀的開發人員,專案開發的過程中監控告警系統的可靠性是可以體現出一個人的工程管理能力的。優秀的監控告警系統可以免去很多精力消耗,比如維護,故障預判,故障及時準確通知 ,故障定位排查等 。
可以想像專案上線後,假如沒有監控告警系統,這麼一個暗箱是多麼可怕。
對於大資料專案, 資料一般需要先入訊息佇列,如kafka,然後分離線和實時將資料進行解耦分流,用於實時處理和離線處理。訊息佇列存在的好處:
-
訊息佇列的訂閱者可以根據需要隨時擴充套件,可以很好的擴充套件資料的使用者。
-
訊息佇列的橫向擴充套件,增加吞吐量,做起來還是很簡單的。這個用傳統資料庫,分庫分表還是很麻煩的。
-
由於訊息佇列的存在,也可以幫助我們抗高峰,避免高峰時期後端處理壓力過大導致整個業務處理宕機。
kafka在大資料專案中作用至關重要,那麼對其的監控告警就至關重要了,我們這裡主要是講針對kafka流量的監控告警,其目的也是很明顯的便於我們瞭解資料的整體情況及波動情況,以調整處理後端,如spark streaming,flume等。
kafka 監控工具很多,常見的有kafka manager, KafkaOffsetMonitor,kafka eagle,kafka tools等,浪尖最經常使用的是kafka manager,也建議大家使用該工具,其不僅有監控功能還有管理功能。具體使用方法可以參看:
ofollow,noindex">kafka管理神器-kafkamanager
監控指標
kafka 的指標伺服器和客戶端都有的。具體指標內容,可以參看 kafka 官網:
http://kafka.apache.org/0102/documentation.html#monitoring
檢視可用指標的最簡單方法是啟動 jconsole 並將其指向正在執行的 kafka 客戶端或伺服器 ; 這將允許使用 JMX 瀏覽所有指標。
對於熟悉 kafka manager 的朋友都應該看過 broker 相關資訊,比如每秒鐘的流入的訊息條數,每秒鐘的流入的訊息大小,流出的訊息大小等。
使用kafka manager 可以很方便的檢視。但是,這其實不能讓我們及時的發現數據流量波動,或者說我們想畫個曲線的詳細對比歷史流量,它是做不到的。所以,我們要想辦法去獲取出來這些指標,然後做我們自己的展示。還有一點就是,流量波動告警。
浪尖這裡只做了圖中幾個指標的介面:
def getBytesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = { getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesInPerSec", topicOption) } def getBytesOutPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = { getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesOutPerSec", topicOption) } def getBytesRejectedPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = { getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "BytesRejectedPerSec", topicOption) } def getFailedFetchRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = { getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedFetchRequestsPerSec", topicOption) } def getFailedProduceRequestsPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = { getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "FailedProduceRequestsPerSec", topicOption) } def getMessagesInPerSec(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, topicOption: Option[String] = None) = { getBrokerTopicMeterMetrics(kafkaVersion, mbsc, "MessagesInPerSec", topicOption) }
jmx 客戶端
連線 jmx 的 server 是可以使用 jconsole ,但是滿足不了我們的需求。所以,我們使用 JMXConnectorFactory 方式連線 jmx 。使用 JMXConnectorFactory 連結 jmx 時, JMXServiceURL 的引數 url 必須使用 service:jmx 方式進行連線,具體連結建立方式很簡單,幾行程式碼而已,如下:
"hostname"
val jmxPort =9999
val urlString = s"service:jmx:rmi:///jndi/rmi:// $ jmxHost : $ jmxPort/jmxrmi"
val url = newJMXServiceURL(urlString)
valjmxc = JMXConnectorFactory. connect (url )
val mbsc = jmxc.getMBeanServerConnection;
println (KafkaMetrics. getMessagesInPerSec (Kafka_0_10_2_1 , mbsc , Some ( "test")).fifteenMinuteRate)
jmxc.close()
開啟 kafka 的 jmx 埠
kafka 的 jmx 服務預設時關閉的,開啟的話很簡單,只需要在 kafka server 的啟動指令碼 kafka-server-start.sh 裡增加一行程式碼即可,內容 export JMX_PORT="9999" ,增加位置如下:
if [ "x$KAFKA_HEAP_OPTS" = "x"]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" export JMX_PORT="9999" fi
測試
我這裡測試就比較簡單了,主要是將訊息條數打出來,大家可以根據需要自行調整,比如均值大於閾值發簡訊告警等。
本文是自己實現kafka 監控系統的第二篇文章,前面有篇文章講到了從kafka broker獲取消費者已經提交的offset,具體可以閱讀:
一套完整的kafka監控,包括:
-
消費者監控,主要是存活告警,消費滯後告警。
-
生產者監控,主要是存活告警,生產者消費上游資料能力告警。
-
broker監控,主要是存活告警,流量告警,isr列表,topic異常告警,control變換告警。
內容頗多,後面陸續出文章實現,當然整個專案最終會放到星球裡的。
本文原始碼,關注本公眾號(bigdatatip),輸入 1027 獲取。
微信技術交流群,掃描微信二維碼,拉入群聊,也可以搜尋微信 158570986。
更多幹貨加入浪尖知識星球,179價格最後一天,明天開始199.
微信交流群,可以掃浪尖微信。