1. 程式人生 > >ELK日誌分析平臺加入Kafka訊息佇列

ELK日誌分析平臺加入Kafka訊息佇列

在之前的搭建elk環境中,日誌的處理流程為:filebeat --> logstash --> elasticsearch,隨著業務量的增長,需要對架構做進一步的擴充套件,引入kafka叢集。日誌的處理流程變為:filebeat --> kafka --> logstash --> elasticsearch。架構圖如下所示:
這裡寫圖片描述

Kafka: 資料緩衝佇列。作為訊息佇列解耦了處理過程,同時提高了可擴充套件性。具有峰值處理能力,使用訊息佇列能夠使關鍵元件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。

版本說明:
Kafka:kafka_2.12-1.1.0
Zookeeper:zookeeper-3.4.12
ELK元件版本為6.2.3

配置filebeat輸出到kafka叢集:

修改filebeat配置檔案,配置輸出到kafka:

filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /tmp/test.log

output.kafka:
  enabled: true
  hosts: ["192.168.20.201:9092", "192.168.20.203:9092", "192.168.20.56:9092"]
  topic: 'test-log'

重啟filebeat服務,並嘗試向/tmp/test.log檔案中追加一些測試資料,在kafka叢集任意節點檢視主題,並消費這些資料。

#filebeat客戶端模擬生成日誌:
echo "111" >>/tmp/test.log

#在kafka叢集任意節點檢視是否生成對應topic:
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test-log

#嘗試能否消費該主題下的資料
# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.20.203:9092 --topic test-log --from-beginning
{"@timestamp":"2018-06-03T06:15:35.800Z"
,"@metadata":{"beat":"filebeat","type":"doc","version":"6.2.3","topic":"test-log"},"message":"111","prospector":{"type":"log"},"beat":{"hostname":"ELK","version":"6.2.3","name":"192.168.20.60"},"source":"/tmp/test.log","offset":379} #如果不指定codec,預設為json格式

到此filebeat輸出到kafka配置完成,詳細用法引數可以參考官方文件

Logstsh從kafka讀取資料,並輸出到es:

先配置輸出到標準輸出:

# cat kafka.conf 
input {
    kafka {
        codec => "json"
        bootstrap_servers => "192.168.10.201:9092,192.168.20.203:9092,192.168.20.56:9092"
        topics => "test-log"
        consumer_threads => 2
    }
}
output {
    stdout { codec => rubydebug }
}

檢查配置語法,沒問題啟動logstash:

#檢查語法
/usr/local/elk/logstash-6.2.3/bin/logstash -f kafka.conf -t

#啟動
/usr/local/elk/logstash-6.2.3/bin/logstash -f kafka.conf --config.reload.automatic

同樣先模式向日志文件中插入一條資料,檢視logstash輸出是否正常:

#filebeat客戶端模擬生成日誌:
echo "222" >>/tmp/test.log

#logstash端輸出結果:
...
{
        "offset" => 383,
      "@version" => "1",
    "prospector" => {
        "type" => "log"
    },
          "beat" => {
            "name" => "192.168.20.60",
        "hostname" => "ELK",
         "version" => "6.2.3"
    },
       "message" => "222",
    "@timestamp" => 2018-06-03T06:27:55.820Z,
        "source" => "/tmp/test.log"
}

到目前為止,整體流程已經走通,kafka叢集成功的加入到elk平臺中。更多關於kafka輸入外掛的資料可以參考官方文件

filebeat收集不同日誌輸出到kafka不同的topic中:

深入瞭解一下,如何將不同log輸出到不同到kafka topics中呢?
對於6.0以後的版本,可以使用fields。然後通過%{[]}獲取對應的值

filebeat配置檔案如下所示:

filebeat.prospectors:
- type: log
  enabled: true
  paths:
    - /tmp/access.log
  fields:
    log_topics: apache-access

- type: log
  enabled: true
  paths:
    - /tmp/error.log
  fields:

output.kafka:
  enabled: true
  hosts: ["192.168.20.201:9092", "192.168.20.203:9092", "192.168.20.56:9092"]
  #topic: 'test-log'
  topic: '%{[fields][log_topics]}'

對應的,在logstash上,如果要分別解釋對應的topic:

input {
    kafka {
        codec => "json"
        bootstrap_servers => "192.168.10.201:9092,192.168.20.203:9092,192.168.20.56:9092"
        topics => ["apache-access","apache-error"]
        consumer_threads => 2
    }
}
filter {
    if[fields][log_topics] == "apache-access" {
        grok {...}
    }
    if[fields][log_topics] == "apache-error" {
        grok {...}
    }
}
output {
    stdout { codec => rubydebug }
}