1. 程式人生 > >TIGK技術棧:kapacitor(二)tickscript指令碼基本概念和語法

TIGK技術棧:kapacitor(二)tickscript指令碼基本概念和語法

kapacitor指令碼一些核心概念

  • task: task是一個tickscript的最大粒度單位,可以說,一個tickscript就是一個task,task標識在kapacitor當中處理一段資料(a set of data)的任務集,一段指令碼要起作用,首先需要經過kapacitor對當前的task檔案進行編譯(define)然後enable,同事還可以使用record命令來檢測某一個task最近20分鐘進入的資料流和節點處理的結果

  • pipeline 和名稱類似,pipeline就是指管道,標識資料的流向 kapaciotr當中的資料是由influxdb處監聽獲取,從管道 自上而下進行處理,可以有分支,可以有交匯,但是不可以回溯,管道根據資料來源來源方式不同,可以分為兩種,即stream和batch

  • stream 資料流 kapacitor關鍵字之一,用來標識資料流來源的入口,即它會監聽每一個插入influxdb的資料,然後通過管道當中 的各個節點進行處理,最終流向管道底部 格式:

 var varible_name = stream
|from()
...
  • batch 批處理 kapacitor關鍵字之一,用來標識批處理資料來源的入口,可以理解為每隔一段時間執行一條influxQL去influxdb當中查詢資料,查詢到的結果集會隨當前batch所在的管道流向底部 格式:
var varible_name = batch
|query('''select usage_idle from cpu where host = 'xxx'''')
...

stream是監聽當前influxdb每一個時間點插入的資料流,而batch則是獲取的InfluxQL查詢的結果集,此結果集應該是單行的,stream下面可以不用直接跟from()方法指定資料庫和表(即會監聽當前所有插入influxdb的資料) 但是batch方法下必須指定query()作為批處理對資料進行篩選

  • node 用來組成管道的節點,可以理解為我們常見的水管上用來拼接成管道的連線頭,也可以理解為processor point,即資料處理器節點,一個pipeline是通過多個node組成的

  • filed和tag 是influxdb資料庫的欄位型別 field表示當前插入資料值,它帶有資料型別,包括double int等 tag其實就是標籤的意思,通常用於資料篩選,它的資料型別一定是字串型別 eg:

host       cpu_usage_idle
----------------------
'1138'         58.5

表示1138主機上的cpu使用空閒率為58.5% 在kapacitor當中是會將filed和tag進行區分的,一個明顯的區別就是filed在node的處理過程當中可能會被銷燬,但是tag不會

  • syntax-subspace 語法子空間 kapacitor借鑑了很多種語言做出了tickscript,而tickscript當中包含語法子空間的 簡單的說就是在一個task當中,不止包含tickscript語法 舉個栗子 在使用query方法時
batch
query('''select count(usage_idle) from cpu where time > now() - 10s limit 1''')

這裡包含了influxQL查詢語句,influxQL語法,就是一個語法子空間 還有比如使用eval模組時

|eval(lambda : if("usage_idle" > 80)).as('res')

這裡使用lambda表示式,則使用了tickscript 和 lambda語法 通常在kapacitor當中也只會包含這兩種另外的語法

  • chaining method和property method 連線方法和連線方法的屬性方法(子方法) 之前有說過node 定義一個node的方式則是使用一個管道符號(“|”)然後跟一個chaining method,概念上可以把chaining method直接認為是node 比如 var data = stream |from() 上面的from就是一個chaining method,和管道符號一起組成了一個node 而屬性方法則是定義在當前連線方法下的子方法 比如as() 連線方法join()和連線方法eval()都有as()作為properties method 但是是兩個完全不同的方法(join()下的as是為交匯的pipeline的結果集欄位賦予字首,eval()下的as是建立一個新的field欄位供後面的節點使用) eg:
var data=stream
|from()
  .database('aomp_monitor_db')
  .retentionPolicy('one_day')

這裡的database和retentionPolicy都是定義在chaining method下的property method,用來指定資料庫和資料儲存方法 需要注意的是,property method的執行順序是在chaining method執行之前按順序執行,property method的同一個chaining method下的property metohd的執行順序也會影響執行結果(親測)

就像不同的水管只能裝不同的聯結器一樣,不同的node後面只能跟指定的node,有的node作為pipeline的終端後面也不可以跟node,具體可以上官網看node之間的關聯關係

kapacitor指令碼語法

1.指令碼結構:

  • 通常一個task被拆分為兩個部分 declaration和expression 即宣告和操作 宣告即是宣告變數 操作則是對資料進行處理評測併發出警告等,即是上面寫的pipeline定義和處理 比如:
var v_alarmRuleId=1894653
var v_alarmPolicyId=1894654
var v_level='warn'
var v_host='{{ index .Tags "host" }}'
var v_target_id='{{ index .Tags "host" }}'
var v_db='aomp_monitor_db'
var v_rp='autogen'
var v_measurement='cpu'
var v_groupBy='host'
var v_whereFilter=lambda: ("host"=='1889911' OR "host"=='1874471' OR "host"=='1855534')
var v_name='測試'
var v_field='usage_idle'
var v_idVar=v_name +':{{.Group}}'
var v_message='id:{{ .ID }} ,name: {{ .Name}},taskname:{{.TaskName}},Group:{{.Group}}, host: {{ index .Tags "host" }} ,Level :warn, value:{{ index .Fields "value" }},time:{{.Time}}'
var v_idTag='alertID'
var v_levelTag='level'
var v_every=10s
var v_messageField='message'
var v_durationField='duration'
var v_value='{{ index .Fields "value" }}'
var data = stream 
|from() 
   .database(v_db) 
   .retentionPolicy(v_rp) 
   .measurement(v_measurement) 
   .where(v_whereFilter) 
   .groupBy(v_groupBy) 
   |eval(lambda: if(("usage_idle"<=10), 1, 0)).as('res').keep() 
var trigger = data 
|alert() 
   .info(lambda: "res" == 1)
    .id(v_idVar) 
    .message(v_message) 
    .messageField(v_messageField) 
    .durationField(v_durationField) 
    .stateChangesOnly() 
    .details('{"v_alarmRuleId":"1894653","v_alarmPolicyId":"1894654","v_level":"warn","v_host":"{{ index .Tags "host" }}","v_target_id":"{{ index .Tags "host" }}","v_db":"aomp_monitor_db","v_rp":"autogen","v_measurement":"cpu","v_groupBy":"host","v_name":"測試多條件告警規則策略","v_field":"usage_idle","v_idVar":"v_name +:{{.Group}}","v_message":"id:{{ .ID }} ,name: {{ .Name}},taskname:{{.TaskName}},Group:{{.Group}}, host: {{ index .Tags "host" }} ,Level :warn, value:{{ index .Fields "value" }},time:{{.Time}}","v_idTag":"alertID","v_levelTag":"level","v_every":"10s","v_messageField":"message","v_durationField":"duration","v_value":"{{ index .Fields "value" }}"}')
    .post('http://192.168.183.81:7001/open_api/alarm_notify/receive') 
var solve_trigger = data 
| window() 
  .period(60s) 
  .every(v_every) 
|alert() 
   .info(lambda: "res" == 0)
   .warn(lambda: "res" == 1)
    .id(v_idVar) 
    .message(v_message) 
    .messageField(v_messageField) 
    .durationField(v_durationField) 
    .stateChangesOnly() 
    .details('{"v_alarmRuleId":"1894653","v_alarmPolicyId":"1894654","v_level":"warn","v_host":"{{ index .Tags "host" }}","v_target_id":"{{ index .Tags "host" }}","v_db":"aomp_monitor_db","v_rp":"autogen","v_measurement":"cpu","v_groupBy":"host","v_name":"測試多條件告警規則策略","v_field":"usage_idle","v_idVar":"v_name +:{{.Group}}","v_message":"id:{{ .ID }} ,name: {{ .Name}},taskname:{{.TaskName}},Group:{{.Group}}, host: {{ index .Tags "host" }} ,Level :warn, value:{{ index .Fields "value" }},time:{{.Time}}","v_idTag":"alertID","v_levelTag":"level","v_every":"10s","v_messageField":"message","v_durationField":"duration","v_value":"{{ index .Fields "value" }}"}')
    .post('http://192.168.183.81:7001/open_api/alarm_notify/solve') 

2.變數 kapacitor指令碼中所有宣告的變數都是常量,每一次評估時變數的內容不可以修改 類似kapacitor這樣的指令碼有一個概念叫evaluate,即評估,每一次評估則意味著一次資料進入指令碼,在資料在pipeline當中流動的時候,當前指令碼定義的變數值和流動的資料本身,是不可修改的 變數宣告格式類似go和javascript

var v_db = 'monitor_linux_db'

變數名必須以英文開頭,區分大小寫,英文開頭後可以跟數字,下劃線等 變數宣告不可以用kapacitor關鍵字,比如stream和batch

3.空格 tickscript中,在expression部分,空格是可以忽略的,但是在宣告變數的declaration部分要求必須使用空格將var標識和變數名隔開

4.單雙引號 單引號在任何時候都表示字串, 使用\ 反斜槓進行轉義 同時kapacitor也支援類似python的語法,用三個連續的單引號表示長字串(避免過多的轉義符)

在lambda表示式和influxql語句當中,使用雙引號來表示某一個field或者tag,尤其是在influxql語句當中,強烈建議欄位名加上雙引號,因為influxql有特別多的內建關鍵字,使用雙引號可以將欄位名和關鍵字進行區分

5.資料結構 常用的資料結構包含 string duration int float lamdba list

  • string 使用單引號表示字串 當需要在字串當中引用變數時: 引用當前script當中定義的變數直接使用{{ .valrible_name }} 比如{{ .ID }} 引用結果集或者stream當中定義的變數使用{{ index .Fileds "usage_idle" }}或者{{ index.Tags "host" }}
  • duration 時間資料型別 這裡列舉一下時間資料型別常見字面量 1d 1w 1h 1m 1s 1ms 1u 天 周 小時 分 秒 毫秒 微秒
  • regular expression 正則表示式型別 字面量:
var cz_reg = /^cz\d+/
  • lambda表示式 通常用來做資料處理並返回一個處理後的值 常見字面量 var my_lambda = lambda:1>0 返回一個值為true的lambda表示式型別變數
  • int 和 float 如果沒有在使用lineprotocol插入資料到influxdb的時候指定某個filed欄位為int型別,那麼預設都是float型別,需要注意的是,有的lambda表示式方法需要的引數限制了資料型別為int,這個時候可以用int()方法強轉一下

6.comparison operator 比較器 lamdba比較器 使用 == 和 !=,以及AND 和 OR 進行表示式比較 influxql比較器 除了普遍的sql語法外,influxQL還支援使用=~和!~ 來進行正則表示式的匹配

7.關於node的分類 關於node node分類為 datasource definition nodes 比如 batch和stream data definition node 比如 from() query() data manipulation nodes 比如 sample() default() processing nodes processing nodes下分 資料構造處理節點和傳輸處理節點 比如alert()是資料傳輸處理節點 join()是資料構造處理節點

下一篇講kapacitor指令碼監控實戰和使用此元件監控的心得體會