1. 程式人生 > >ELK 之Filebeat 結合Logstash 過濾出來你想要的日誌

ELK 之Filebeat 結合Logstash 過濾出來你想要的日誌

filebeat logstash ELK

先扯點沒用的

收集日誌的目的是有效的利用日誌,有效利用日誌的前提是日誌經過格式化符合我們的要求,這樣才能真正的高效利用收集到elasticsearch平臺的日誌。默認的日誌到達elasticsearch 是原始格式,亂的讓人抓狂,這個時候你會發現Logstash filter的可愛之處,它很像一塊橡皮泥,如果我們手巧的話就會塑造出來讓自己舒舒服服的作品,but 如果你沒搞好的話那就另說了,本文的宗旨就是帶你一起飛,搞定這塊橡皮泥。當你搞定之後你會覺得kibana 的世界瞬間清爽了很多!
FIlebeat 的4大金剛
Filebeat 有4個非常重要的概念需要我們知道,
Prospector(礦工);

Harvest (收割者);
libeat (匯聚層);
registry(註冊計入者);
Prospector 負責探索日誌所在地,就如礦工一樣要找礦,而Harvest如礦主一樣的收割者礦工們的勞動成果,哎,世界無處不剝削啊!每個Prospector 都有一個對應的Harvest,然後他們有一個共同的老大叫做Libbeat,這個家夥會匯總所有的東西,然後把所有的日誌傳送給指定的客戶,這其中還有個非常重要的角色”registry“,這個家夥相當於一個會計,它會記錄Harvest 都收割了些啥,收割到哪裏了,這樣一但有問題了之後,harvest就會跑到會計哪裏問:“上次老子的活幹到那塊了”?Registry 會告訴Harvest 你Y的上次幹到哪裏了,去哪裏接著幹就行了。這樣就避免了數據重復收集的問題!

FIlebeat 詳細配置:

filebeat.prospectors:
- input_type: log
  enabled: True
  paths:
    - /var/log/mysql-slow-*
#這個地方是關鍵,我們給上邊日誌加上了tags,方便在logstash裏邊通過這個tags 過濾並格式化自己想要的內容;
  tags: ["mysql_slow_logs"]
#有的時候日誌不是一行輸出的,如果不用multiline的話,會導致一條日誌被分割成多條收集過來,形成不完整日誌,這樣的日誌對於我們來說是沒有用的!通過正則匹配語句開頭,這樣multiline 會在匹配開頭
之後,一直到下一個這樣開通的語句合並成一條語句。
#pattern:多行日誌開始的那一行匹配的pattern
#negate:是否需要對pattern條件轉置使用,不翻轉設為true,反轉設置為false
#match:匹配pattern後,與前面(before)還是後面(after)的內容合並為一條日誌
#max_lines:合並的最多行數(包含匹配pattern的那一行 默認值是500行
#timeout:到了timeout之後,即使沒有匹配一個新的pattern(發生一個新的事件),也把已經匹配的日誌事件發送出去
  multiline.pattern: ‘^\d{4}/\d{2}/\d{2}‘  (2018\05\01  我的匹配是已這樣的日期開頭的)
  multiline.negate: true
  multiline.match: after
  multiline.Max_lines:20
  multiline.timeout: 10s
- input_type: log
  paths:
    - /var/log/mysql-sql-*
  tags: ["mysql_sql_logs"]
  multiline.pattern: ‘^\d{4}/\d{2}/\d{2}‘
  multiline.negate: true
  multiline.match: after
  multiline.timeout: 10s
  encoding: utf-8
  document_type: mysql-proxy
  scan_frequency: 20s
  harverster_buffer_size: 16384
  max_bytes: 10485760
  tail_files: true
 #tail_files:如果設置為true,Filebeat從文件尾開始監控文件新增內容,把新增的每一行文件作為一個事件依次發送,而不是從文件開始處重新發送所有內容。默認是false;

Logstash 的詳細配置

input {
  kafka {
    topics_pattern => "mysql.*"
    bootstrap_servers => "x.x.x.x:9092"
    #auto_offset_rest => "latest"
    codec => json
    group_id => "logstash-g1"
   }
}
#終於到了關鍵的地方了,logstash的filter ,使用filter 過濾出來我們想要的日誌,
filter {
     #if 還可以使用or 或者and 作為條件語句,舉個栗子: if “a” or “b”  or “c” in [tags],這樣就可以過濾多個tags 的標簽了,我們這個主要用在同型號的交換設備的日誌正規化,比如說你有5臺交換機,把日誌指定到了同一個syslog-ng 上,收集日誌的時候只能通過同一個filebeat,多個prospector加不同的tags。這個時候過濾就可以通過判斷相應的tags來完成了。
    if "mysql_slow_logs" in [tags]{ 
    grok {
     #grok 裏邊有定義好的現場的模板你可以用,但是更多的是自定義模板,規則是這樣的,小括號裏邊包含所有一個key和value,例子:(?<key>value),比如以下的信息,第一個我定義的key是data,表示方法為:?<key> 前邊一個問號,然後用<>把key包含在裏邊去。value就是純正則了,這個我就不舉例子了。這個有個在線的調試庫,可以供大家參考,http://grokdebug.herokuapp.com/ 
        match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+)\.\d+\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"}
#過濾完成之後把之前的message 移出掉,節約磁盤空間。
        remove_field => ["message"]
    }
  }
    else if "mysql_sql_logs" in [tags]{
    grok {
        match => { "message" => "(?<date>\d{4}/\d{2}/\d{2}\s(?<datetime>%{TIME}))\s-\s(?<status>\w{2})\s-\s(?<respond_time>\d+\.\d+)\w{2}\s-\s%{IP:client}:(?<client-port>\d+)\[\d+\]->%{IP:server}:(?<server-port>\d+).*:(?<databases><\w+>):(?<SQL>.*)"}
        remove_field => ["message"]}
  }
}

我要過濾的日誌樣本:

2018/05/01 16:16:01.892 - OK - 759.2ms - 172.29.1.7:35184[485388]->172.7.1.39:3306[1525162561129639717]:<DB>:select count(*) from test[];
過濾後的結果如下:
{
  "date": [
    [
      "2018/05/01 16:16:01.892"
    ]
  ],
  "datetime": [
    [
      "16:16:01.892"
    ]
  ],
  "TIME": [
    [
      "16:16:01.892"
    ]
  ],
  "HOUR": [
    [
      "16"
    ]
  ],
  "MINUTE": [
    [
      "16"
    ]
  ],
  "SECOND": [
    [
      "01.892"
    ]
  ],
  "status": [
    [
      "OK"
    ]
  ],
  "respond_time": [
    [
      "759"
    ]
  ],
  "client": [
    [
      "172.29.1.7"
    ]
  ],
  "IPV6": [
    [
      null,
      null
    ]
  ],
  "IPV4": [
    [
      "172.29.1.7",
      "172.7.1.39"
    ]
  ],
  "client-port": [
    [
      "35184"
    ]
  ],
  "server": [
    [
      "172.7.1.39"
    ]
  ],
  "server-port": [
    [
      "3306"
    ]
  ],
  "databases": [
    [
      "<DB>"
    ]
  ],
  "SQL": [
    [
      "select count(*) from test[];"
    ]
  ]
}

#有圖有真相:
技術分享圖片

ELK 之Filebeat 結合Logstash 過濾出來你想要的日誌