1. 程式人生 > >ELK 之數據收集傳輸過濾 Filebeat+Logstash 部署

ELK 之數據收集傳輸過濾 Filebeat+Logstash 部署

filebeat logstash elk

本文與前文是有關聯的,之前的兩篇文章客官可以擡腿出門右轉 導讀,ELK 之前端,ELK 之分布式發
#前端和消息隊列搞定之後,我們需要安裝數據采集工具filebeats和數據過濾機運輸工具Logstash,一般情況我們都使用filebeats 用來收集日誌文件,我自定義了一個log文件,文件內容如下:
55.3.244.1 GET /index.html 15824 0.043
55.3.244.1 GET /index.html 15824 0.043
文件位置放到/tmp/test.log

cd /opt/elk/filebeat-6.2.2-linux-x86_64
#filebeat 的配置文件大體上分為兩部分,輸入部分和輸出部分,輸入指定輸入源即可,輸出可以選擇直接到elasticsearch,logstash或者是消息隊列(redis&kafka)等,本文采用kafka 作為消息隊列;

vi filebeat.yml
filebeat.prospectors:
#日誌類型

  • type: log
    enabled: True

    日誌路徑可以

    寫多個,支持通配符
    paths:

    • /tmp/test.log
      #設置字符集編碼
      encoding: utf-8
      #文檔類型
      document_type: my-nginx-log
      #每十秒掃描一次
      scan_frequency: 10s

      實際讀取文件時,每次讀取 16384 字節

      harverster_buffer_size: 16384
      #The maximum number of bytes that a single log message can have. All bytes after max_bytes are discarded and not sent. This setting is especially useful for multiline log messages, which can get large. The default is 10MB (10485760). 一次發生的log大小值;

      max_bytes: 10485760

      是否從文件末尾開始讀取

      tail_files: true
      #給日誌加上tags方便在logstash過濾日誌的時候做判斷
      tags: ["nginx-access"]
      #剔除以.gz 結尾的文件
      exclude_files: [".gz$"]
      #output 部分的配置文檔
      #配置輸出為kafka,無論你是tar包還是rpm安裝,目錄裏邊會看到官方提供的filebeat.full.yml OR filebeat.reference.yml 這裏邊有filebeat 所有的input 方法和output 方法供你參考;
      output.kafka:

      enabled: true
      #kafka 的server,可以配置集群,例子如下:
      hosts:["ip:9092","ip2:9092","ip3:9092"]
      #這個非常重要,filebeat作為provider,把數據輸入到kafka裏邊,logstash 作為消費者去消費這些信息,logstash的input 中需要這個topic,不然logstash沒有辦法取到數據。
      topic: elk-%{[type]}

      The number of concurrent load-balanced Kafka output workers. kafka 的並發運行進程

      worker: 2
      #當傳輸給kafka 有問題的時候,重試的次數;
      max_retries: 3
      #單個kafka請求裏面的最大事件數,默認2048
      bulk_max_size: 2048
      #等待kafka broker響應的時間,默認30s
      timeout: 30s
      #kafka broker等待請求的最大時長,默認10s
      broker_timeout: 10s
      #每個kafka broker在輸出管道中的消息緩存數,默認256
      channel_buffer_size: 256
      #網絡連接的保活時間,默認為0,不開啟保活機制
      keep_alive: 60
      #輸出壓縮碼,可選項有none, snappy, lz4 and gzip,默認為gzip (kafka支持的壓縮,數據會先被壓縮,然後被生產者發送,並且在服務端也是保持壓縮狀態,只有在最終的消費者端才會被解壓縮)
      compression: gzip
      #允許的最大json消息大小,默認為1000000,超出的會被丟棄,應該小於broker的??message.max.bytes(broker能接收消息的最大字節數)
      max_message_bytes: 1000000
      #kafka的響應返回值,0位無等待響應返回,繼續發送下一條消息;1表示等待本地提交(leader broker已經成功寫入,但follower未寫入),-1表示等待所有副本的提交,默認為1
      required_acks: 0
      #The configurable ClientID used for logging, debugging, and auditing purposes. The default is "beats"。客戶端ID 用於日誌怕錯,審計等,默認是beats。
      client_id: beats
      #測試配置文件:/opt/elk/filebeat/filebeat -c /opt/elk/filebeat/filebeat.yml test config
      #如果配置文件沒有問題的話,會出現config ok ,如果有問題會提示具體問題在哪裏。
      #啟動filebeat
      可以先通過 /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c -e /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml 查看一下輸入filebeat是否工作正常,會有很多信息打印到屏幕上;
      nohup /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml >>/dev/null 2>&1&

logstash 安裝配置:
input {
#數據來源
kafka {
#這個對應filebeat的output 的index
topics_pattern => "elk-.*"
#kafka 的配置
bootstrap_servers => "IP1:9092,IP2:9092,IP3:9092"
kafka 中的group ID
group_id => "logstash-g1"
}
}
#過濾器,如果你想要日誌按照你的需求輸出的話,需要在logstash中過濾並且給與相應的key,比如以下的是nginx 日誌,用的是logstash內置好的過濾器,%{IP:client} 這個裏邊包含了兩個信息:ip地址,client 是在kibana 中顯示的自定義鍵值,最終顯示成這樣,http://grokdebug.herokuapp.com/ 是在線調試logstash filter的工具,你可以在線調試你的過濾規則。

filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
#因為我們取出了字段,所以不需要原來這個message字段,這個字段裏邊包含之前beat 輸入的所有字段。
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => ["IP:9200"]
#這個是在kibana上的index Patterns 的索引,建議什麽服務就用幹什麽名字,因為beat 提供了些kibana的模板可以導入,導入的模板匹配的時候用的索引是對應服務的名稱開頭 。
index => "nginx-%{+YYYY.MM.dd}"
document_type => "nginx"
#每次20000 發送一次數據到elasticsearch
flush_size => 20000
如果不夠20000,沒10秒會發送一次數據;、
idle_flush_time =>10
}
}

日誌從進入到輸出的流程圖:
技術分享圖片

Kibana 上邊的日誌通過logstash 過濾之後取出來在kibana上顯示如下:
技術分享圖片

ELK 之數據收集傳輸過濾 Filebeat+Logstash 部署