Filebeat+Kafka+Logstash+ElasticSearch+Kibana 日誌采集方案
前言
Elastic Stack 提供 Beats 和 Logstash 套件來采集任何來源、任何格式的數據。其實Beats 和 Logstash的功能差不多,都能夠與 Elasticsearch 產生協同作用,而且
logstash比filebeat功能更強大一點,2個都使用是因為:Beats 是一個輕量級的采集器,支持從邊緣機器向 Logstash 和 Elasticsearch 發送數據。考慮到 Logstash 占用系
統資源較多,我們采用 Filebeat 來作為我們的日誌采集器。並且這裏采用kafka作為傳輸方式是為了避免堵塞和丟失,以實現日誌的實時更新。
介紹
1.Filebeat
filebat
是一個用於轉發和集中日誌數據的輕量級shipper
。作為代理安裝在服務器上,filebeat
監視指定的日誌文件或位置,收集日誌事件,並將它們轉發給ElasticSearch
或logstash
進行索引。
2.Logstash:Logstash 是開源的服務器端數據處理管道,能夠同時從多個來源采集數據,轉換數據,然後將數據發送到存儲庫。
3.ElasticSearch:Elasticsearch 是基於 JSON 的分布式搜索和分析引擎,專為實現水平擴展、高可靠性和管理便捷性而設計。
4.Kibana:Kibana 能夠以圖表的形式呈現數據,並且具有可擴展的用戶界面,供您全方位配置和管理 Elastic Stack。
Elastic Stack架構
軟件環境
Ubuntu 18.04.1
Elastic Stack官網:https://www.elastic.co/
kafka、Zookeeper安裝
kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者規模的網站中的所有動作流數據。
ZooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,是Google的Chubby一個開源的實現,是Hadoop和Hbase的重要組件。它是一個為分布式應用
提供一致性服務的軟件,提供的功能包括:配置維護、域名服務、分布式同步、組服務等。
註:安裝kafka時會發現它需要安裝Zookeeper的。kafka
Zookeeper單機安裝配置:
Step 1:下載壓縮包並解壓
1 >wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz 2 >tar -zxvf zookeeper-3.4.13.tar.gz 3 >cd zookeeper-3.4.13
Step 2:修改配置文件
先復制模板配置文件,並重命名,然後裏面存放數據的路徑dataDir可以自己定義
1 >cp -rf conf/zoo_sample.cfg conf/zoo.cfg 2 >vim zoo.cfg
Step 3:啟動服務
1 >./bin/zkServer.sh start 2 3 >./bin/zkServer.sh status
先啟動,系統會默認加載zoo.cfg配置,然後使用 zkServer.sh status 查看服務狀態
kafka安裝配置:
根據官網教程開始安裝(復制的官網教程):
Step 1: Download the code
1 > wget http://mirror.apache-kr.org/kafka/2.1.0/kafka_2.11-2.1.0.tgz 2 > tar -xzf kafka_2.11-2.1.0.tgz 3 > cd kafka_2.11-2.1.0/
Step 2: Start the server
Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don‘t already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.(由於我自己安裝的Zookeeper,所以這一步可以省略)
1 > ./bin/zookeeper-server-start.sh config/zookeeper.properties 2 [2019-01-23 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) 3 ...
Now start the Kafka server:
首先可以先修改一下配置文件server.propertise
1 # master 2 broker.id=0 3 listeners=PLAINTEXT://9092 4 advertised.listeners=PLAINTEXT://localhost:9092 5 num.network.threads=3 6 num.io.threads=8 7 socket.send.buffer.bytes=102400 8 socket.receive.buffer.bytes=102400 9 socket.request.max.bytes=104857600 10 log.dirs=/tmp/kafka-logs 11 num.partitions=5 12 num.recovery.threads.per.data.dir=1 13 offsets.topic.replication.factor=1 14 transaction.state.log.replication.factor=1 15 transaction.state.log.min.isr=1 16 log.retention.hours=168 17 log.segment.bytes=1073741824 18 log.retention.check.interval.ms=300000 19 # 連接 20 zookeeper.connect=localhost:2181 21 zookeeper.connection.timeout.ms=6000 22 group.initial.rebalance.delay.ms=0 23 # 可刪除topic 24 delete.topic.enable=true
1 > ./bin/kafka-server-start.sh config/server.properties 2 [2019-01-23 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) 3 [2019-01-23 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) 4 ...
到這裏服務就啟動成功了。
查看java進程pid也可以發現kafka
Step 3: Create a topic
Let‘s create a topic named "test" with a single partition and only one replica:
1 > ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
if you want delete a topic:
1 >./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
We can now see that topic if we run the list topic command:
1 > ./bin/kafka-topics.sh --list --zookeeper localhost:2181 2 test
Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.
在上一次配置的日子目錄(server.properties裏配置的log.dirs)上可以看到,已經存在了該主題的日誌信息文件了,由於server.properties裏面配置的num.partitions=1(默認的分區數,一個topic默認1個分區數)。所以只有一個文件夾
要查看topic詳細信息的話:
1 >bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Step 4: Star a producer(創建發布者)
1 > ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Step 5: Start a consumer(創建訂閱者)
1 > ./bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic test --from-beginning
創建好後,可以在發布者端發送消息進行測試,我們這裏的發布者是filebeat,訂閱者是logstash,可以不用配置。
Filebeat安裝配置
beat是一個輕量級的數據傳輸者,它有好幾種分類,這裏暫時只用到Filebeat
Filebeat的處理流程基本分為3部分:Input、Filter、Output。
開始安裝
1.官網下載 :filebeat-6.5.4-linux-x86_64.tar.gz,復制到LINUX服務器,然後解壓( tar -xzvf filebeat-6.5.4-linux-x86_64.tar.gz)
2.進入filebeat-6.5.4-linux-x86_64文件夾,修改filebeat.yml,filebeat默認是將數據傳輸到elasticsearch,需要修改為kafka
這裏放一個filebeat詳細配置參考:https://www.cnblogs.com/kuku0223/p/8316922.html
配置完成後啟動filebeat:
1 > sudo ./filebeat -e -c filebeat.yml
ELK(elasticsearch+logstash+kibana 6.5.4)配置
elasticsearch安裝配置
Elasticsearch 是一個分布式、RESTful 風格的搜索和數據分析引擎,能夠解決不斷湧現出的各種用例。
順帶提一下Elasticsearch的數據結構:FST(Finite StateTransducers),其特點高度重復利用數據前綴和後綴存儲數據,能很大層度上壓縮數據。
elasticsearch安裝配置:
從官網上下載軟件:https://www.elastic.co/cn/downloads/elasticsearch
解壓之後,修改config/elasticsearch.yml配置文件,配置一下ip和端口號:
1 skyworth@skyworth-ubuntu:~$ cd /home/tools/elk/elasticsearch-6.5.4/ 2 skyworth@skyworth-ubuntu:/home/tools/elk/elasticsearch-6.5.4$ vim config/elasticsearch.yml
然後啟動命令:
1 skyworth@skyworth-ubuntu:/home/tools/elk/elasticsearch-6.5.4$ ./bin/elasticsearch
然後瀏覽器打開端口(http://192.168.80.15:9200/),出現如下信息說明啟動成功
kibana安裝配置
通過 Kibana,能夠對 Elasticsearch 中的數據進行可視化並在 Elastic Stack 進行操作.
kibana安裝配置:
首先下載軟件,然後解壓,修改配置文件config/kibana.yml,再啟動就行,與elasticsearch類似。
1 skyworth@skyworth-ubuntu:/home/tools/elk/kibana-6.5.4-linux-x86_64$ vim config/kibana.yml
1 skyworth@skyworth-ubuntu:/home/tools/elk/kibana-6.5.4-linux-x86_64$ ./bin/kibana
logstash安裝配置
Logstash 是開源的服務器端數據處理管道,能夠同時從多個來源采集數據,轉換數據,然後將數據發送到存儲庫中。
安裝過程依然是下載壓縮包,解壓後修改配置文件,再啟動。
進入config文件夾,我們可以修改原有的logstash-sample.conf,或者自己新建一個(這裏選擇自己新建一個logstash-skyworth.conf)。
修改配置文件:
input{ kafka{ bootstrap_servers => ["192.168.80.15:9092"] client_id => "test" group_id => "test" auto_offset_reset => "latest" //從最新的偏移量開始消費 consumer_threads => 5 decorate_events => true //此屬性會將當前topic、offset、group、partition等信息也帶到message中 topics => ["test"] type => "skyworth" } } filter { if ([message]== "") { drop {} } } output { if [type] == "skyworth"{ elasticsearch { hosts => ["192.168.80.15:9200"] # ElasticSearch的地址加端口 index => "skyworth-log-%{+YYYYMMdd}" # ElasticSearch的保存文檔的index名稱, } } }
啟動命令:
1 >./bin/logstash -f config/logstash-skyworth.conf
總結
業務流程偷了張圖如下:
註:有的人會在filebeat和kafka之間再加一層前置的logstash用於格式化日誌,目前我也沒明白為什麽要那樣做。
Filebeat+Kafka+Logstash+ElasticSearch+Kibana 日誌采集方案