ELK日誌分析平臺加入Kafka訊息佇列
在之前的搭建elk環境中,日誌的處理流程為:filebeat --> logstash --> elasticsearch
,隨著業務量的增長,需要對架構做進一步的擴充套件,引入kafka叢集。日誌的處理流程變為:filebeat --> kafka --> logstash --> elasticsearch
。架構圖如下所示:
Kafka: 資料緩衝佇列。作為訊息佇列解耦了處理過程,同時提高了可擴充套件性。具有峰值處理能力,使用訊息佇列能夠使關鍵元件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
版本說明:
Kafka:kafka_2.12-1.1.0
Zookeeper:zookeeper-3.4.12
ELK元件版本為6.2.3
配置filebeat輸出到kafka叢集:
修改filebeat配置檔案,配置輸出到kafka:
filebeat.prospectors:
- type: log
enabled: true
paths:
- /tmp/test.log
output.kafka:
enabled: true
hosts: ["192.168.20.201:9092", "192.168.20.203:9092", "192.168.20.56:9092"]
topic: 'test-log'
重啟filebeat服務,並嘗試向/tmp/test.log
檔案中追加一些測試資料,在kafka叢集任意節點檢視主題,並消費這些資料。
#filebeat客戶端模擬生成日誌:
echo "111" >>/tmp/test.log
#在kafka叢集任意節點檢視是否生成對應topic:
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test-log
#嘗試能否消費該主題下的資料
# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.20.203:9092 --topic test-log --from-beginning
{"@timestamp":"2018-06-03T06:15:35.800Z" ,"@metadata":{"beat":"filebeat","type":"doc","version":"6.2.3","topic":"test-log"},"message":"111","prospector":{"type":"log"},"beat":{"hostname":"ELK","version":"6.2.3","name":"192.168.20.60"},"source":"/tmp/test.log","offset":379}
#如果不指定codec,預設為json格式
到此filebeat輸出到kafka配置完成,詳細用法引數可以參考官方文件
Logstsh從kafka讀取資料,並輸出到es:
先配置輸出到標準輸出:
# cat kafka.conf
input {
kafka {
codec => "json"
bootstrap_servers => "192.168.10.201:9092,192.168.20.203:9092,192.168.20.56:9092"
topics => "test-log"
consumer_threads => 2
}
}
output {
stdout { codec => rubydebug }
}
檢查配置語法,沒問題啟動logstash:
#檢查語法
/usr/local/elk/logstash-6.2.3/bin/logstash -f kafka.conf -t
#啟動
/usr/local/elk/logstash-6.2.3/bin/logstash -f kafka.conf --config.reload.automatic
同樣先模式向日志文件中插入一條資料,檢視logstash輸出是否正常:
#filebeat客戶端模擬生成日誌:
echo "222" >>/tmp/test.log
#logstash端輸出結果:
...
{
"offset" => 383,
"@version" => "1",
"prospector" => {
"type" => "log"
},
"beat" => {
"name" => "192.168.20.60",
"hostname" => "ELK",
"version" => "6.2.3"
},
"message" => "222",
"@timestamp" => 2018-06-03T06:27:55.820Z,
"source" => "/tmp/test.log"
}
到目前為止,整體流程已經走通,kafka叢集成功的加入到elk平臺中。更多關於kafka輸入外掛的資料可以參考官方文件
filebeat收集不同日誌輸出到kafka不同的topic中:
深入瞭解一下,如何將不同log輸出到不同到kafka topics中呢?
對於6.0以後的版本,可以使用fields。然後通過%{[]}獲取對應的值
filebeat配置檔案如下所示:
filebeat.prospectors:
- type: log
enabled: true
paths:
- /tmp/access.log
fields:
log_topics: apache-access
- type: log
enabled: true
paths:
- /tmp/error.log
fields:
output.kafka:
enabled: true
hosts: ["192.168.20.201:9092", "192.168.20.203:9092", "192.168.20.56:9092"]
#topic: 'test-log'
topic: '%{[fields][log_topics]}'
對應的,在logstash上,如果要分別解釋對應的topic:
input {
kafka {
codec => "json"
bootstrap_servers => "192.168.10.201:9092,192.168.20.203:9092,192.168.20.56:9092"
topics => ["apache-access","apache-error"]
consumer_threads => 2
}
}
filter {
if[fields][log_topics] == "apache-access" {
grok {...}
}
if[fields][log_topics] == "apache-error" {
grok {...}
}
}
output {
stdout { codec => rubydebug }
}