incubator-dolphinscheduler 如何在不寫任何新程式碼的情況下,能快速接入到prometheus和grafana中進行監控
一、prometheus和grafana 簡介
prometheus是由谷歌研發的一款開源的監控軟體,目前已經貢獻給了apache 基金會託管。
監控通常分為白盒監控和黑盒監控之分。
- 白盒監控:通過監控內部的執行狀態及指標判斷可能會發生的問題,從而做出預判或對其進行優化。
- 黑盒監控:監控系統或服務,在發生異常時做出相應措施。
prometheus的優勢:
- 易於管理,通俗易懂
- 能夠輕易獲取服務內部狀態,比如jvm等。
- 高效靈活的查詢語句
- 支援本地和遠端儲存,支援時序資料庫
- 採用http協議,預設pull模式拉取資料,也可以通過中間閘道器push資料(需要安裝push gateway)
- 支援自動發現(通過服務的方式進行自動發現待監控的目標,可以通過Consul實現服務發現)
- 可擴充套件,支援使用者自定義開發
- 易整合,可以和grafana 快速整合。
備註:此架構圖摘自prometheus官方網站
prometheus根據配置定時可以去拉取各個節點的資料,預設使用的拉取方式是pull,也可以使用pushgateway提供的push方式獲取各個監控節點的資料。將獲取到的資料存入TSDB(時序型資料庫),pushgateway 就是 外部的應用可以將監控的資料主動推送給pushgateway,然後prometheus 自動從pushgateway進行拉取。此時prometheus已經獲取到了監控資料,可以使用內建的PromQL進行查詢。它的報警功能使用Alertmanager提供,Alertmanager是prometheus的告警管理和傳送報警的一個元件。prometheus原生的圖示功能由於過於簡單,因此建議將prometheus資料接入grafana,由grafana進行統一管理。
Grafana是開源的視覺化監控、分析利器,支援多種資料庫型別和豐富的套件,目前已支援超過50多個數據源,50多個面板,17個應用程式和1700多個不同的儀表圖。(本文作者:張永清,轉載請註明來源部落格園:https://www.cnblogs.com/laoqing/p/14538635.html)
- 擁有快速靈活的客戶端圖表,面板外掛有許多不同方式的視覺化指標和日誌,官方提供的庫中具有豐富的儀表盤外掛,比如甘特圖、熱圖、折線圖、圖表等多種展示方式。
- 支援許多不同的時間序列資料(資料來源)儲存後端。每個資料來源都有一個特定查詢編輯器。官方支援資料來源:Graphite、infloxdb、opensdb、prometheus、elasticsearch、cloudwatch,mysql 等。每個資料來源的查詢語言和功能有較大差異。可以將來自多個數據源的資料組合到一個儀表板上,但每個面板都要繫結到屬於特定組織的特定資料來源中。
- 告警允許將規則附加到儀表板面板上。儲存儀表板時會將警報規則提取到單獨的警報規則儲存中,並安排它們進行評估。報警訊息還能支援釘釘、郵箱等推送至移動端。
二、incubator-dolphinscheduler 簡介
incubator-dolphinscheduler是一個由國內公司發起的大資料領域的開源排程專案,incubator-dolphinscheduler 能夠支撐非常多的應用場景,包括:
- 以DAG圖的方式將Task按照任務的依賴關係關聯起來,可實時視覺化監控任務的執行狀態
- 支援豐富的任務型別:Shell、MR、Spark、SQL(mysql、postgresql、hive、sparksql),Python,Sub_Process、Procedure,flink,datax,sqoop,http等
- 支援工作流定時排程、依賴排程、手動排程、手動暫停/停止/恢復,同時支援失敗重試/告警、從指定節點恢復失敗、Kill任務等操作
- 支援工作流優先順序、任務優先順序及任務的故障轉移及任務超時告警/失敗
- 支援工作流全域性引數及節點自定義引數設定
- 支援資原始檔的在線上傳/下載,管理等,支援線上檔案建立、編輯
- 支援任務日誌線上檢視及滾動、線上下載日誌等
- 實現叢集HA,通過Zookeeper實現Master叢集和Worker叢集去中心化
- 支援對
Master/Worker
cpu load,memory,cpu線上檢視 - 支援工作流執行歷史樹形/甘特圖展示、支援任務狀態統計、流程狀態統計
- 支援補數
- 支援多租戶
- 支援國際化
備註:此架構圖摘自社群官方網站
三、incubator-dolphinscheduler 如何快速接入到prometheus和grafana 中進行監控
1、通過prometheus中push gateway的方式採集監控指標資料。
需要藉助push gateway一起,然後將資料傳送到push gateway 地址中,比如地址為http://10.25x.xx.xx:8085,那麼就可以寫一個shell 指令碼,通過crontab排程或者incubator-dolphinscheduler排程,定期執行shell指令碼,來發送指標資料到prometheus中。
#!/bin/bash failedTaskCounts=`mysql -h 10.25x.xx.xx -u username -ppassword -e "select 'failed' as failTotal ,count(distinct(process_definition_id)) as failCounts from dolphinscheduler.t_ds_process_instance where state=6 and start_time>='${datetimestr} 00:00:00'" |grep "failed"|awk -F " " '{print $2}'` echo "failedTaskCounts:${failedTaskCounts}" job_name="Scheduling_system" instance_name="dolphinscheduler" cat <<EOF | curl --data-binary @- http://10.25x.xx.xx:8085/metrics/job/$job_name/instance/$instance_name failedSchedulingTaskCounts $failedTaskCounts EOF
這段指令碼中failedSchedulingTaskCounts 就是定義的prometheus中的一個指標。指令碼通過sql語句查詢出失敗的任務數,然後傳送到prometheus中。
然後在grafana 中就可以選擇資料來源為prometheus,並且選擇對應的指標。
比如可以通過如下shell 指令碼採集正在執行的任務數,然後通過push gateway 傳送到prometheus中。(本文作者:張永清,轉載請註明來源部落格園:https://www.cnblogs.com/laoqing/p/14538635.html)
#!/bin/bash runningTaskCounts=`mysql -h 10.25x.xx.xx -u username -ppassword -e "select 'running' as runTotal ,count(distinct(process_definition_id)) as runCounts from dolphinscheduler.t_ds_process_instance where state=1" |grep "failed"|awk -F " " '{print $2}'` echo "runningTaskCounts:${runningTaskCounts}"
job_name="Scheduling_system"
instance_name="dolphinscheduler" if [ "${runningTaskCounts}yy" == "yy" ];then runningTaskCounts=0 fi cat <<EOF | curl --data-binary @- http://10.25x.xx.xx:8085/metrics/job/$job_name/instance/$instance_name runningSchedulingTaskCounts $runningTaskCounts EOF
採集好了後,就可以達到如下的效果了
2、通過grafana 直接查詢dolphinscheduler自身 的Mysql資料庫(也可以是別的資料庫)
首先需要在grafana 中定義一個數據源,這個資料來源就是dolphinscheduler自身 的Mysql資料庫。
然後在grafana 中選擇這個資料來源,Format as 格式選擇table,輸入對應的sql語句。(本文作者:張永清,轉載請註明來源部落格園:https://www.cnblogs.com/laoqing/p/14538635.html)
比如如下sql,統計本週以及當日正在執行的排程任務的情況。
select d.*,ifnull(f.today_runCount,0) as today_runCount,ifnull(e.today_faildCount,0) as today_faildCount,ifnull(f.today_avg_timeCosts,0) as today_avg_timeCosts,ifnull(f.today_max_timeCosts,0) as today_max_timeCosts, ifnull(g.week_runCount,0) as week_runCount,ifnull(h.week_faildCount,0) as week_faildCount,ifnull(g.week_avg_timeCosts,0) as week_avg_timeCosts,ifnull(g.week_max_timeCosts,0) as week_max_timeCosts from (select a.id,c.name as project_name,a.name as process_name,b.user_name,a.create_time,a.update_time from t_ds_process_definition a,t_ds_user b, t_ds_project c where a.user_id=b.id and c.id=a.project_id and a.release_state=$status) d left join (select count(1) as today_faildCount,process_definition_id from t_ds_process_instance where state=6 and start_time>=DATE_FORMAT(NOW(),'%Y-%m-%d 00:00:00') and start_time<=DATE_FORMAT(NOW(),'%Y-%m-%d 23:59:59') group by process_definition_id ) e on d.id=e.process_definition_id left join (select count(1) as today_runCount,avg(UNIX_TIMESTAMP(end_time)-UNIX_TIMESTAMP(start_time)) as today_avg_timeCosts,max(UNIX_TIMESTAMP(end_time)-UNIX_TIMESTAMP(start_time)) as today_max_timeCosts,process_definition_id from t_ds_process_instance where start_time>=DATE_FORMAT(NOW(),'%Y-%m-%d 00:00:00') and start_time<=DATE_FORMAT(NOW(),'%Y-%m-%d 23:59:59') group by process_definition_id ) f on d.id=f.process_definition_id left join (select count(1) as week_runCount,avg(UNIX_TIMESTAMP(end_time)-UNIX_TIMESTAMP(start_time)) as week_avg_timeCosts,max(UNIX_TIMESTAMP(end_time)-UNIX_TIMESTAMP(start_time)) as week_max_timeCosts,process_definition_id from t_ds_process_instance where start_time>=DATE_FORMAT(SUBDATE(CURDATE(),DATE_FORMAT(CURDATE(),'%w')-1), '%Y-%m-%d 00:00:00') and start_time<=DATE_FORMAT(SUBDATE(CURDATE(),DATE_FORMAT(CURDATE(),'%w')-7), '%Y-%m-%d 23:59:59') group by process_definition_id ) g on d.id=g.process_definition_id left join (select count(1) as week_faildCount,process_definition_id from t_ds_process_instance where state=6 and start_time>=DATE_FORMAT(SUBDATE(CURDATE(),DATE_FORMAT(CURDATE(),'%w')-1), '%Y-%m-%d 00:00:00') and start_time<=DATE_FORMAT( SUBDATE(CURDATE(),DATE_FORMAT(CURDATE(),'%w')-7), '%Y-%m-%d 23:59:59') group by process_definition_id ) h on d.id=h.process_definition_id
這些配置完後,儲存就可以得到如下的表格:(本文作者:張永清,轉載請註明來源部落格園:https://www.cnblogs.com/laoqing/p/14538635.html)
還可以支援甘特圖等多種圖,此處不再一一介紹了。