1. 程式人生 > >萬字長文:ELK(V7)部署與架構分析

萬字長文:ELK(V7)部署與架構分析

ELK(7版本)部署與架構分析

1、ELK的背景介紹與應用場景

  在專案應用執行的過程中,往往會產生大量的日誌,我們往往需要根據日誌來定位分析我們的伺服器專案執行情況與BUG產生位置。一般情況下直接在日誌檔案中tailf、 grep、awk 就可以獲得自己想要的資訊。但在規模較大的場景中,此方法效率低下,面臨問題包括日誌量過大、文字搜尋太慢、如何多維度查詢。這就需要對伺服器上的日誌收集彙總。常見解決思路是建立集中式日誌收集系統,將所有節點上的日誌統一收集,管理,訪問。

  一般大型系統往往是一種分散式部署的架構,不同的服務模組部署在不同的伺服器上,問題出現時,大部分情況需要根據問題暴露的關鍵資訊,定位到具體的伺服器和服務模組,所以構建一套集中式日誌系統,可以提高定位問題的效率。 一個完整的集中式日誌系統,需要包含以下幾個主要特點:

  • 收集-能夠採集多種來源的日誌資料,服務日誌與系統日誌。
  • 傳輸-能夠穩定的把日誌資料傳輸到中央系統
  • 儲存-如何儲存日誌資料,持久化資料。
  • 分析-可以支援 UI 分析,介面化定製檢視日誌操作。

 ELK提供了一整套解決方案,並且都是開源軟體,之間互相配合使用,完美銜接,高效的滿足了很多場合的應用。是目前主流的一種日誌系統。

 

2、ELK簡介:

ELK是三個開源軟體的縮寫,分別表示:Elasticsearch , Logstash, Kibana , 它們都是開源軟體。其中後來新增了一個FileBeat。

即ELK主要由Elasticsearch(搜尋)、Logstash(收集與分析)和Kibana(展示)三部分元件組成;

 

其中各元件說明如下:

Filebeat:輕量級資料收集引擎。早期的ELK架構中使用Logstash收集、解析日誌,但是Logstash對記憶體、cpu、io等資源消耗比較高。如果用它來對伺服器進行日誌收集,將加重伺服器的負載。相比 Logstash,Beats所佔系統的CPU和記憶體幾乎可以忽略不計,所以filebeat作為一個輕量級的日誌收集處理工具(Agent),它可以用來替代Logstash,由於其佔用資源少,所以更適合於在各個伺服器上搜集日誌後傳輸給Logstash,這也是官方推薦的一種做法。【收集日誌】


Logstash:資料收集處理引擎。支援動態的從各種資料來源蒐集資料,並對資料進行過濾、分析、豐富、統一格式等操作,然後儲存以供後續使用。【對日誌進行過濾、分析】


Elasticsearch:分散式搜尋引擎。是基於Lucene的開源分散式搜尋伺服器,具有高可伸縮、高可靠、易管理等特點。可以用於全文檢索、結構化檢索和分析,並能將這三者結合起來。【蒐集、分析、儲存資料】

Kibana:視覺化平臺。它能夠搜尋、展示儲存在 Elasticsearch 中索引資料。使用它可以很方便的用圖表、表格、地圖展示和分析資料。【圖形化展示日誌】

結合以上,常見的日誌系統架構圖如下:

 

如上圖所示,日誌檔案分別由filebeat在伺服器上進行收集,收集的日誌檔案彙總到logstash上並對檔案資料進行過濾、分析、豐富、統一格式等操作,然後傳送到Elasticsearch,進一步對日誌進行結構化檢索和分析,並存儲下來,最後由kibana進行展示。

這只是日誌系統的一種最初級結構,生產環境中需要對此結構進行進一步的優化。

 

3、ELK系統的部署(配置將在生產環境架構中進行說明)

環境:CentOS7.5部署ELK 7版本

準備工作: CentOS7.5    

elasticsearch7, logstash7,  kibana7

關閉防火牆和SELinux並更新yum源(非必須):yum -y update

本次分散式部署的ELK版本為2019年五月份左右最新發布的版本,更新了許多新特性,後面將詳細說明。在安裝上面本次極大簡化了安裝步驟,可以原始碼,元件安裝,本次採用YUM+外掛docker安裝,能達到相同的效果。

3.1 Filebeat安裝與配置

 

#!/bin/bash
# 安裝Filebeat
rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

cat >/etc/yum.repos.d/elk-elasticsearch.repo<<EOF
[elastic-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF

yum -y install filebeat

systemctl enable filebeat
systemctl start filebeat

3.2 elasticsearch,Logstash,Kibana和Filebeat的安裝(分開部署請分別安裝三個元件)

1. java環境(7版本自帶Java)

yum -y install java

2. 匯入elasticsearch PGP key檔案
rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

3. 配置yum源
vim /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

4. yum -y install elasticsearch logstash kibana

##########關於安裝環境,也可以參照以下二種方式,也比較簡便#################

----即通過RPM包的方式,此處給出Filebeat的方式,其他元件雷同

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.0.0-x86_64.rpm 

sudo rpm -vi filebeat-7.0.0-x86_64.rpm

 

----Logstash例項另一種方式(優先採用)

wget https://artifacts.elastic.co/downloads/logstash/logstash-7.0.0.rpm
yum install -y java
yum install -y logstash-7.0.0.rpm

 

##服務啟動前必看,由於配置檔案在下一章節給出,此處是前提準備,等配置完成後再啟動##

3.3 關於elasticsearch需要重點關注的地方

1.elasticsearch在分詞方面,需要新增中文分詞的外掛

在其安裝程式碼的plugins目錄,即/usr/share/elasticsearch/plugins,需要增加中文分詞外掛。

[root@192-168-108-35 plugins]# ll
total 3192
[root@192-168-108-35 plugins]# pwd
/usr/share/elasticsearch/plugins
[root@192-168-108-35 plugins]# wget https://codeload.github.com/medcl/elasticsearch-analysis-ik/tar.gz/v7.0.0

下載後重啟時需要刪除外掛原始檔,否則報錯如下:

Caused by: java.nio.file.FileSystemException: /usr/share/elasticsearch/plugins/v7.0.0/plugin-descriptor.properties: Not a directory
所以安裝完成後必須刪除原始檔v7.0.0

2.elasticsearch服務的啟動問題與前提準備

-- 首先在安裝完畢後會生成很多檔案,包括配置檔案日誌檔案等等,下面幾個是最主要的配置檔案路徑
/etc/elasticsearch/elasticsearch.yml # els的配置檔案
/etc/elasticsearch/jvm.options # JVM相關的配置,記憶體大小等等
/etc/elasticsearch/log4j2.properties # 日誌系統定義

/usr/share/elasticsearch # elasticsearch 預設安裝目錄
/var/lib/elasticsearch # 資料的預設存放位置

 

-- 建立用於存放資料與日誌的目錄
資料檔案會隨著系統的執行飛速增長,所以預設的日誌檔案與資料檔案的路徑不能滿足我們的需求,那麼手動建立日誌與資料檔案路徑。

mkdir -p /data/elkdata
mkdir -p /data/elklogs

-- JVM配置 (7.0版本針對此做出優化,可以基本保障溢位問題,但最好設定一下)
由於Elasticsearch是Java開發的,所以可以通過/etc/elasticsearch/jvm.options配置檔案來設定JVM的相關設定。如果沒有特殊需求按預設即可。
不過其中還是有兩項最重要的-Xmx1g與-Xms1gJVM的最大最小記憶體。如果太小會導致Elasticsearch剛剛啟動就立刻停止。太大會拖慢系統本身。
vim /etc/elasticsearch/jvm.options      # JVM最大、最小使用記憶體

-Xms1g                        

-Xmx1g

 

-- 使用ROOT賬戶執行命令
elasticsearch的相關配置已經完成,下面需要啟動elasticsearch叢集。但是由於安全的考慮,elasticsearch不允許使用root使用者來啟動,所以需要建立一個新的使用者,併為這個賬戶賦予相應的許可權來啟動elasticsearch叢集。

建立ES執行使用者
# 建立使用者組        group add elk

# 建立使用者並新增至使用者組   useradd elk -g elk

# (可選)更改使用者密碼       

passwd elasticsearch

 

同時修改ES目錄許可權, 以下操作都是為了賦予es使用者操作許可權,

-- 安裝原始碼檔案目錄
[root@192-168-108-35 share]# chown -R elk :elk /usr/share/elasticsearch/
[root@192-168-108-35 share]# chown -R elk :elk /var/log/elasticsearch/
[root@192-168-108-35 share]# chown -R elk :elk /data         

-- 執行常見的報錯資訊

[1]檔案數目不足
# 修改系統配置檔案屬性

# vim /etc/security/limits.conf 新增 

elk soft memlock unlimited

elk hard memlock unlimited

elk soft nofile 65536

elk hard nofile 131072
退出使用者重新登入,使配置生效

 

[2]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
解決辦法:
在   /etc/sysctl.conf檔案最後新增一行
vm.max_map_count=262144
即可永久修改

 

3. 啟動elasticsearch服務

前臺啟動服務
# 需切換為es使用者

su elk

# 啟動服務(當前的路徑為:/usr/share/elasticsearch/)

./bin/elasticsearch

後臺執行ES
可以加入-p 命令 讓es在後臺執行, -p 引數 記錄程序ID為一個檔案
# 設定後臺啟動
./bin/elasticsearch -p ./elasticsearch-pid -d

一般情況下,直接執行一下命令即可

./bin/elasticsearch -d

 

結束程序
# 檢視執行的pid,並查殺 cat /tmp/elasticsearch-pid && echo

kill -SIGTERM {pid}

# 暴力結束程序(另一種方式)
kill -9 `ps -ef |grep elasticsearch|awk '{print $2}' `


驗證一下服務是否正常
curl -i "http://192.168.60.200:9200"

4.安裝elasticsearch-head外掛(支援前端介面檢視資料)
安裝docker映象或者通過github下載elasticsearch-head專案都是可以的,1或者2兩種方式選擇一種安裝使用即可

【1】使用docker的整合好的elasticsearch-head
   # docker run -p 9100:9100 mobz/elasticsearch-head:5
   docker容器下載成功並啟動以後,執行瀏覽器開啟http://localhost:9100/

 

 

【2】使用git安裝elasticsearch-head
   # yum install -y npm
   # git clone git://github.com/mobz/elasticsearch-head.git
   # cd elasticsearch-head
   # npm install
   # npm run start
   檢查埠是否起來
   netstat -antp |grep 9100
   瀏覽器訪問測試是否正常
   http://IP:9100/

【注意】由於elasticsearch-head:5映象對elasticsearch的7版本適配性未經檢測。這裡也推薦推薦另外一個映象lmenezes/cerebro,下載後執行

docker run -d -p 9000:9000 lmenezes/cerebro

就可以在9000埠查看了。

 

 

最後,filebeat logstash kibana可以在配置檔案後,正常啟動,如果需要切換使用者的話,也可以參照上面。本次這三個元件全部預設用root使用者啟動。由於分開部署,與ES互不影響。

附:kafka三節點叢集搭建

環境準備

1.zookeeper叢集環境,本次採用Kafka自帶的Zookeeper環境。
kafka是依賴於zookeeper註冊中心的一款分散式訊息對列,所以需要有zookeeper單機或者叢集環境。

2.三臺伺服器:
192.168.108.200 ELK-kafka-cluster 

192.168.108.165 ELK-kafka-salve1 

192.168.108.103 ELK-kafka-salve2

3.下載kafka安裝包

在 http://kafka.apache.org/downloads 中下載,目前最新版本的kafka已經到2.2.0,這裡下載的是kafka_2.11-2.2.0.tgz

安裝kafka叢集


1.上傳壓縮包到三臺伺服器解壓縮到/opt/目錄下

tar -zxvf kafka_2.11-2.2.0.tgz -C /opt/

ls -s kafka_2.11-2.2.0 kafka

 

2.修改 server.properties   (/opt/kafka/config目錄下)

############################# Server Basics #############################

broker.id=0

######################## Socket Server Settings ########################

listeners=PLAINTEXT://192.168.108.200:9092
advertised.listeners=PLAINTEXT://192.168.108.200:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

############################# Log Basics #############################

log.dirs=/var/log/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1

######################## Internal Topic Settings #########################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

######################### Log Retention Policy ########################

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

zookeeper.connect=192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
delete.topic.enable=true

######################## Group Coordinator Settings ##########################

group.initial.rebalance.delay.ms=0

 

3.拷貝兩份到192.168.108.165和192.168.108.103 

[[email protected] config]# cat server.properties

broker.id=1

listeners=PLAINTEXT://192.168.108.165:9092 

advertised.listeners=PLAINTEXT://192.168.108.165:9092 

[root@k8s-n3 config]# cat server.properties

broker.id=2

listeners=PLAINTEXT://192.168.108.103:9092 

advertised.listeners=PLAINTEXT://192.168.108.103:9092

 

然後新增環境變數 在/etc/profile 中新增
export ZOOKEEPER_HOME=/opt/kafka

export PATH=$PATH:$ZOOKEEPER_HOME/bin
source /etc/profile 過載生效

 

4.修改zookeeper.properties:
1、設定連線引數,新增如下配置
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5

2、設定broker Id的服務地址
server.0=192.168.108.200:2888:3888
server.1=192.168.108.165:2888:3888
server.2=192.168.108.103:2888:3888

 

總結就在Kafka原始碼目錄的/kafka/config/zookeeper.properties檔案設定如下

  • dataDir=/opt/zookeeper                         # 這時需要在/opt/zookeeper資料夾下,新建myid檔案,把broker.id填寫進去,本次本節點為0。
  • # the port at which the clients will connect
  • clientPort=2181
  • # disable the per-ip limit on the number of connections since this is a non-production config
  • maxClientCnxns=100
  • tickTime=2000
  • initLimit=10
  • syncLimit=5
  • server.0=192.168.108.200:2888:3888
  • server.1=192.168.108.165:2888:3888
  • server.2=192.168.108.103:2888:3888

然後在zookeeper資料目錄新增id配置(dataDir=/opt/zookeeper)
在zookeeper.properties的資料目錄中建立myid檔案
在各臺服務的zookeeper資料目錄新增myid檔案,寫入服務broker.id屬性值,如這裡的目錄是/usr/local/zookeeper/data
第一臺broker.id為0的服務到該目錄下執行:echo 0 > myid

[root@192-168-108-165 kafka]# cd /opt/zookeeper/

[root@192-168-108-165 zookeeper]# ls

myid  version-2

[root@192-168-108-165 zookeeper]# cat myid

1

[root@192-168-108-165 zookeeper]#

 

叢集啟動

叢集啟動:cd /opt/kafka
先分別啟動zookeeper
kafka使用到了zookeeper,因此你要首先啟動一個zookeeper服務,如果你沒有zookeeper服務。kafka中打包好了一個簡潔版的單節點zookeeper例項。
kafka啟動時先啟動zookeeper,再啟動kafka;關閉時相反,先關閉kafka,再關閉zookeeper

前臺啟動:bin/zookeeper-server-start.sh config/zookeeper.properties

[root@192-168-108-200 ~]# lsof -i:2181

COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME

java 5197 root 98u IPv6 527171 0t0 TCP *:eforward (LISTEN)
可以看到已經啟動
後臺啟動:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

 

再分別啟動kafka

前臺啟動:bin/kafka-server-start.sh  config/server.properties
[root@192-168-108-200 ~]# lsof -i:9092
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 5519 root 157u IPv6 528511 0t0 TCP 192-168-108-200:XmlIpcRegSvc (LISTEN)
java 5519 root 167u IPv6 528515 0t0 TCP 192-168-108-200:XmlIpcRegSvc->192.168.108.191:56656 (ESTABLISHED)
java 5519 root 169u IPv6 528516 0t0 TCP 192-168-108-200:XmlIpcRegSvc->192.168.108.187:49368 (ESTABLISHED)
java 5519 root 176u IPv6 528096 0t0 TCP 192-168-108-200:48062->192-168-108-200:XmlIpcRegSvc (ESTABLISHED)
java 5519 root 177u IPv6 528518 0t0 TCP 192-168-108-200:XmlIpcRegSvc->192-168-108-200:48062 (ESTABLISHED)
java 5519 root 178u IPv6 528097 0t0 TCP 192-168-108-200:XmlIpcRegSvc->192.168.108.187:49370 (ESTABLISHED)

後臺啟動:bin/kafka-server-start.sh  -daemon config/server.properties

 

服務啟動指令碼

cd /opt/kafka
kill -9 `ps -ef |grep kafka|awk '{print $2}' `
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh  -daemon config/server.properties

 

Zookeeper+Kafka叢集測試


/opt/kafka/bin
1.建立topic:
kafka-topics.sh --create --zookeeper 192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181 --replication-factor 3 --partitions 3 --topic test

2.顯示topic
kafka-topics.sh --describe --zookeeper 192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181 --topic test
[root@192-168-108-200 bin]# kafka-topics.sh --describe --zookeeper 192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181 --topic tyun
Topic:tyun PartitionCount:3 ReplicationFactor:3 Configs:
Topic: tyun Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: tyun Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: tyun Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1


PartitionCount:partition個數
  ReplicationFactor:副本個數
  Partition:partition編號,從0開始遞增
  Leader:當前partition起作用的broker.id
  Replicas: 當前副本資料所在的broker.id,是一個列表,排在最前面的其作用
  Isr:當前kakfa叢集中可用的broker.id列表

 

 

3.列出topic
kafka-topics.sh --list --zookeeper 192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181
test

建立 producer(生產者);
kafka-console-producer.sh --broker-list 192.168.108.200:9092 --topic test
hello


建立 consumer(消費者)
kafka-console-consumer.sh --bootstrap-server 192.168.108.200:9092 --topic test --from-beginning
hello

4.檢視寫入kafka叢集中的訊息(重要命令,判斷日誌是否寫入Kafka的重要依據)

bin/kafka-console-consumer.sh --bootstrap-server 192.168.108.200:9092 --topic tiops --from-beginning

 

5.刪除 Topic----命令標記刪除後,再次刪除對應的資料目錄
bin/kafka-topics.sh --delete --zookeeper master:2181,slave1:2181,slave2:2181 --topic topic_name
若 delete.topic.enable=true
直接徹底刪除該 Topic。

若 delete.topic.enable=false
如果當前 Topic 沒有使用過即沒有傳輸過資訊:可以徹底刪除。

如果當前 Topic 有使用過即有過傳輸過資訊:並沒有真正刪除 Topic 只是把這個 Topic 標記為刪除(marked for deletion),重啟 Kafka Server 後刪除。
注:delete.topic.enable=true 配置資訊位於配置檔案 config/server.properties 中(較新的版本中無顯式配置,預設為 true)。

 

 

4. 日誌系統架構解析

 

 

整個系統一共含有10臺主機(filebeat部署在客戶端,不計算在內),其中Logstash有四臺,Elasticsearch有二臺,Kafka叢集三臺,kibana一臺並配置Nginx代理。

架構解釋:

(1)首先使用者通過nginx代理訪問ELK日誌統計平臺,這裡的Nginx可以設定介面密碼。
(2)Nginx將請求轉發到kibana
(3)kibana到Elasticsearch中去獲取資料,這裡的Elasticsearch是兩臺做的叢集,日誌資料會隨機儲存在任意一臺Elasticsearch伺服器。
(4)Logstash1從Kafka中取出資料併發送到Elasticsearch中。
(5)Kafka伺服器做日誌資料的持久化儲存,避免web伺服器日誌量過大的時候造成的資料收集與儲存不一致而導致日誌丟失,其中Kafka可以做叢集,然後再由Logstash伺服器從Kafka持續的取出資料。
(6)logstash2從Filebeat取出的日誌資訊,並放入Kafka中進行儲存。
(7)Filebeat在客戶端進行日誌的收集。

 

注1:【Kafka的加入原因與作用】

整個架構加入Kafka,是為了讓整個系統更好的分層,Kafka作為一個訊息流處理與持久化儲存軟體,能夠幫助我們在主節點上遮蔽掉多個從節點之間不同日誌檔案的差異,負責管理日誌端(從節點)的人可以專注於向 Kafka裡生產資料,而負責資料分析聚合端的人則可以專注於從 Kafka內消費資料。所以部署時要把Kafka加進去。

而且使用Kafka進行日誌傳輸的原因還在於其有資料快取的能力,並且它的資料可重複消費,Kafka本身具有高可用性,能夠很好的防止資料丟失,它的吞吐量相對來說比較好並且使用廣泛。可以有效防止日誌丟失和防止logsthash掛掉。綜合來說:它均衡了網路傳輸,從而降低了網路閉塞,尤其是丟失資料的可能性,

注2:【雙層的Logstash作用】

這裡為什麼要在Kafka前面增加二臺logstash呢?是因為在大量的日誌資料寫入時,容易導致資料的丟失和混亂,為了解決這一問題,增加二臺logstash可以通過型別進行彙總分類,降低資料傳輸的臃腫。

如果只有一層的Logstash,它將處理來自不同客戶端Filebeat收集的日誌資訊彙總,並且進行處理分析,在一定程度上會造成在大規模日誌資料下資訊的處理混亂,並嚴重加深負載,所以有二層的結構進行負載均衡處理,並且職責分工,一層匯聚簡單分流,一層分析過濾處理資訊,並且內層都有二臺Logstash來保障服務的高可用性,以此提升整個架構的穩定性。

接下來分別說明原理與各個元件之間的互動(配置檔案)。

1】Filebeat與Logstash-collect連線配置

這裡為了方便記憶,把此處的Logstash稱為Logstash-collect,首先看Filebeat的配置檔案:

其中,filebeat配置輸出到logstash:5044埠,在logstash上啟動5044埠作為logstash與filebeat的通訊agent,而logstash本身服務起在9600埠

[root@192-168-108-191 logstash]# ss -lnt
State Recv-Q Send-Q Local Address:Port Peer Address:Port
LISTEN 0 128 :::5044 :::*
LISTEN 0 50 ::ffff:192.168.108.191:9600 :::*

 

vim /etc/filebeat/filebeat.yml    # 對幾個重要配置進行設定如下

- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
     - /var/log/*.log
     - /var/log/tiops/**/*.log      # tiops平臺的所有日誌位置, 指定資料的輸入路徑為/tiops/**/*.log結尾的所有檔案,注意/tiops/子目錄下的日誌不會被讀取,孫子目錄下的日誌可以
     #- c:\programdata\elasticsearch\logs\*
   fields:
     service: filebeat
   multiline: # 多行日誌合併為一行,適用於日誌中每一條日誌佔據多行的情況,比如各種語言的報錯資訊呼叫棧。
     pattern: ‘^\[‘
     negate: true
     match: after

# Exclude lines. A list of regular expressions to match. It drops the lines that are
# 刪除以DBG開頭的行:
   exclude_lines: ['^DBG']

# filebeat的日誌可傳送到logstashelasticsearchkibana,選擇一個即可,本次預設選擇logstash,其他二個可以註釋
  output.logstash:
   # The Logstash hosts
     hosts: ["192.168.108.191:5044", "192.168.108.87:5044"]       # 發往二臺Logstash-collect
     loadbalance: true
     worker: 2

  #output.elasticsearch:
     #hosts: ["http://192.168.108.35:9200"]
     #username: "elk"
     #password: ""

  #setup.kibana:
     #host: "192.168.108.182:5601"

 

注1:【FilebeatLogstash-collect的日誌訊息流轉】

注意事項:Logstash-collect怎麼接收到Filebeat傳送的訊息,並再次發向下一級呢?

不僅僅配置發往二臺Logstash-collect的ip地址加埠,還要注意fields欄位,可以理解為是一個Key,當Logstash-collect收到多個Key時,它可以選擇其中一個或者多個Key來進行下一級傳送,達到日誌訊息的轉發。

在使用了6.2.3版本的ELK以後,如果使用if [type]配置,則匹配不到在filebeat裡面使用document_type定義的字串。因為6.0版本以上已經取消了document_type的定義。如果要實現以上的配置只能使用如下配置:

fields:
     service: filebeat

service : filebeat都是自己定義的,定義完成後使用Logstash的if 判斷,條件為if [fields][service] == "filebeat".就可以了,具體可以看下面的轉發策略。

 

注2:【Filebeat負載平衡主機的輸出】

這裡還有一點需要說明一下,就是關於Filebeat與Logstash-collect連線的負載均衡設定。

logstash是一個無狀態的流處理軟體。logstash怎麼叢集配置只能橫向擴充套件,然後自己用配置管理工具分發,因為他們內部並沒有交流的。

Filebeat提供配置選項,可以使用它來調整負載平衡時傳送訊息到多個主機。要啟用負載均衡,您指定loadbalance的值為true。

output.logstash:
   # The Logstash hosts
     hosts: ["192.168.108.191:5044", "192.168.108.87:5044"]       # 發往二臺Logstash-collect
     loadbalance: true
     worker: 2

loadbalance: false  # 訊息只是往一個logstash裡發,如果這個logstash掛了,就會自動將資料發到另一個logstash中。(主備模式)

loadbalance: true       # 如果為true,則將資料均分到各個logstash中,掛了就不發了,往存活的logstash裡面傳送。

即logstash地址如果為一個列表,如果loadbalance開啟,則負載到裡表中的伺服器,當一個logstash伺服器不可達,事件將被分發到可到達的logstash伺服器(雙活模式)

loadbalance選項可供 Redis,Logstash, Elasticsearch輸出。Kafka的輸出可以在其自身內部處理負載平衡。
同時每個主機負載平衡器還支援多個workers。預設是1。如果你增加workers的數量, 將使用額外的網路連線。workers參與負載平衡的總數=主機數量*workers。

在這個小節,配置總體結構,與日誌資料流向如下圖:

 

2】Logstash-collect與Kafka連線配置

首先看/etc/logstash/logstash.yml檔案,作為Logstash-collect的公共配置檔案,我們需要做一下的更改:

vim /etc/logstash/logstash.yml

  • path.data: /var/lib/logstash         #資料存放路徑
  • path.config: /etc/logstash/conf.d       #配置檔案的讀取路徑
  • path.logs: /var/log/logstash        #日誌檔案的儲存路徑
  • pipeline.workers: 2             # 預設為CPU的核數
  • http.host: "192.168.108.186"
  • http.port: 9600-9700 # logstash will pick up the first available ports

注意上面配置檔案的讀取路徑 /etc/logstash/conf.d,在這個資料夾裡面,我們可以設定Logstash的輸入輸出。logstash支援把配置寫入檔案/etc/logstash/conf.d/xxx.conf,然後通過讀取配置檔案來採集資料。

logstash收集日誌基本流程: input–>codec–>filter–>codec–>output ,所以配置檔案可以設定輸入輸出與過濾的基本格式如下:(這裡暫時忽略filter,因為這一層的Logstash主要匯聚資料,暫不分析匹配)

關於codec => json    # json處理

logstash最終會把資料封裝成json型別,預設會新增@timestamp時間欄位、host主機欄位、type欄位。原訊息資料會整個封裝進message欄位。如果資料處理過程中,使用者解析添加了多個欄位,則最終結果又會多出多個欄位。也可以在資料處理過程中移除多個欄位,總之,logstash最終輸出的資料格式是json格式。所以資料經過Logstash會增加額外的欄位,可以選擇過濾。
Logstash主要由 input,filter,output三個元件去完成採集資料。

input {

    指定輸入

}

filter {

}

output {

    指定輸出

}

解釋說明如下:

input
input元件負責讀取資料,可以採用file外掛讀取本地文字檔案,stdin外掛讀取標準輸入資料,tcp外掛讀取網路資料,log4j外掛讀取log4j傳送過來的資料等等。本次用 beats外掛讀取Filebeat傳送過來的日誌訊息。

filter
filter外掛負責過濾解析input讀取的資料,可以用grok外掛正則解析資料,date外掛解析日期,json外掛解析json等等。

output
output外掛負責將filter處理過的資料輸出。可以用elasticsearch外掛輸出到es,redis外掛輸出到redis,stdout外掛標準輸出,kafka外掛輸出到kafka等等

在實際的Tiops平臺日誌處理流程中,配置如下:

二個Logstash-collect均採用此相同的配置

在/etc/logstash/conf.d目錄下,建立filebeat-kafka.conf檔案,內容如下:

input {

  beats {

    port => 5044    # logstash上啟動5044埠作為logstash與filebeat的通訊agent

    codec => json    # json處理

  }

}

#此處附加Redis作為快取訊息的配置,本次採用Kafka,所以此部分暫時註釋

#output {

#  if [fields][service] == "filebeat"{

#    redis {

#         data_type => "list"

#         host => "192.168.108.200"

#         db => "0"

#         port => "6379"

#         key => "tiops-tcmp"

#    }

#  }

#}

output {

  if [fields][service] == "filebeat"{          #  參照第一部分,Logstash-collect怎麼接收到Filebeat傳送的訊息,並再次發向下一級呢?這裡就是取filebeat的Fields欄位的Key值,進行向下傳送

    kafka {

        bootstrap_servers => "192.168.108.200:9092,192.168.108.165:9092,192.168.108.103:9092"    # kafka叢集配置,全部IP:埠都要加上

    topic_id => "tiops"  # 設定Kafka的topic,這樣傳送到kafka中時,kafka就會自動建立該topic

  }

}

}

 

在這個小節,配置總體結構與日誌資料流向如下圖:

 

 

 

3】Kafka與Logstash-grok連線配置

在上面第二部分,我們已經成功把資料從Logstash-collect傳送到Kafka,而Kafka本身的輸入輸出並沒有配置。而是Logstash-collect指定了一個topic,這並不代表kafka不需要配置引數,只是kafka本身不需要配置pull/push目標引數,它被動接受

別的生產者傳送過來的資料,因為Logstash-collect指定了Kafka叢集的IP地址和埠。那麼資料將會被髮送到Kafka中,那麼Kafka如何處理這些日誌資料,那麼這時候就需要Kafka配置訊息處理引數了。

這個我們現在暫時先給出引數,至於具體原因涉及到MQ的高可用、訊息過期策略、如何保證訊息不重複消費、如何保證訊息不丟失、如何保證訊息按順序執行以及訊息積壓在訊息佇列裡怎麼辦等問題,我們之後詳細分析並解決。

以下是Kafka叢集中,某一臺的配置檔案,在Kafka原始碼目錄的/kafka/config/server.properties檔案設定如下:(加粗的為重要配置檔案)

  • broker.id=0
  • listeners=PLAINTEXT://192.168.108.200:9092
  • advertised.listeners=PLAINTEXT://192.168.108.200:9092
  • num.network.threads=3
  • num.io.threads=8
  • socket.send.buffer.bytes=102400
  • socket.receive.buffer.bytes=102400
  • socket.request.max.bytes=104857600
  • log.dirs=/var/log/kafka-logs
  • num.partitions=1
  • num.recovery.threads.per.data.dir=1
  • offsets.topic.replication.factor=1
  • transaction.state.log.replication.factor=1
  • transaction.state.log.min.isr=1
  • log.retention.hours=168
  • log.segment.bytes=1073741824
  • log.retention.check.interval.ms=300000
  • zookeeper.connect=192.168.108.200:2181,192.168.108.165:2181,192.168.108.103:2181    # zookeeper 叢集
  • delete.topic.enable=true

 

同時kafka是依賴於zookeeper註冊中心的一款分散式訊息佇列,所以需要有zookeeper單機或者叢集環境。本次採用Kafka自帶的Zookeeper環境。

在Kafka原始碼目錄的/kafka/config/zookeeper.properties檔案設定如下

  • dataDir=/opt/zookeeper                         # 這時需要在/opt/zookeeper資料夾下,新建myid檔案,把broker.id填寫進去,本次本節點為0。
  • # the port at which the clients will connect
  • clientPort=2181
  • # disable the per-ip limit on the number of connections since this is a non-production config
  • maxClientCnxns=100
  • tickTime=2000
  • initLimit=10
  • syncLimit=5
  • server.0=192.168.108.200:2888:3888
  • server.1=192.168.108.165:2888:3888
  • server.2=192.168.108.103:2888:3888

其他注意事項,請看之前的Kafka叢集安裝步驟,這裡主要給出叢集某一節點的配置檔案引數。

到目前為止,我們配置了kafka的引數,對訊息進行了處理(持久化...),接下來就需要對Kafka中的訊息進行下一步的傳輸,即傳送到Logstash-grok。

 

這裡需要注意的是:並不是Kafka主動把訊息傳送出去的,當然Kafka也支援這種操作,但是在這裡,採用的是Logstash-grok作為消費者,主動拉取Kafka中的訊息,來進行消費。

所以就需要像第二步Logstash-collect那樣進行輸入輸出設定,這裡Logstash-grok不僅僅配置輸入輸出,最重要的是它的過濾filter作用。

在/etc/logstash/conf.d目錄下,建立logstash-es.conf檔案,這裡給出Logstash-grok中一個的內容如下(二個Logstash-grok配置一樣):

#input {

#    redis {

#        data_type => "list"

#        host => "192.168.108.200"

#        db => "0"

#        port => "6379"

#        key => "tiops-tcmp"

#        # password => "123456"

#    }

#}

input{

    kafka{

     bootstrap_servers => ["192.168.108.200:9092,192.168.108.165:9092,192.168.108.103:9092"]  # 配置拉取的kafka叢集訊息地址

   group_id => "tyun"    # 設定 group_id,對於這個消費組而言會有一個消費 offset,消費掉是 auto_commit 部分控制的,這個引數會定期將你消費的 offset 提交到 kafka,下次啟動的時候已經消費過的就不會重複消費了;

                                       # 如果想重新消費,可以換一個 group_id 即可

    auto_offset_reset => "earliest"

    consumer_threads => "3"    # 多個例項的consumer_threads數之和應該等於topic分割槽數近似達到logstash多例項的效果。    

    decorate_events => "false"

    topics => ["tiops"]                # kafka topic 名稱,獲取指定topic內的訊息

    type => "log"

    codec => json

        }

}

filter {       # 此處Logstash最主要的過濾作用,可以自定義正則表示式,選擇性輸出訊息

     grok {

       match => {

       #擷取

    "message" => "(?<LOG>(?<=architecture=x86_64}|containerized=true})(.*)/?)"

}

}

}

output {     # 過濾後的訊息,傳送到ES叢集

    elasticsearch {

        hosts => ["192.168.108.35:9200","192.168.108.194:9200"]

        codec => json

        index => "tiops-%{+YYYY.MM.dd}"     # 自定義索引名稱

    }

 

在這個小節,配置總體結構與日誌資料流向如下圖:

 

 

 

4】Logstash-grok與Elasticsearch(ES)叢集連線配置

ElasticSearch叢集
在第三步中,我們已經將Logstash-grok過濾後的訊息,傳送到了ES叢集,這裡的ES叢集被動接受傳送過來的訊息,即ES叢集本身不需要配置輸入源,已經由其他元件傳送決定了。

ES的配置將集中體現在叢集自身的配置上。

Elasticsearch 可以橫向擴充套件至數百(甚至數千)的伺服器節點,同時可以處理PB級資料
Elasticsearch 天生就是分散式的,並且在設計時遮蔽了分散式的複雜性。

ES功能:(1)分散式的搜尋引擎和資料分析引擎

    (2)全文檢索,結構化檢索,資料分析

    (3)對海量資料進行近實時的處理

安裝後的ES叢集,配置在/etc/elasticsearch/elasticsearch.yml檔案,叢集中,各個節點配置大體相同,不同之處下面有紅字標出,主要就是節點名稱與網路監聽地址,需要每個節點自行按實際填寫。

  • cluster.name: elk-cluster   #叢集名稱
  • node.name: elk-node1     #節點名稱,一個叢集之內節點的名稱不能重複
  • path.data: /data/elkdata    #資料路徑
  • path.logs: /data/elklogs    #日誌路徑
  • # bootstrap.memory_lock: true  #鎖住es記憶體,保證記憶體不分配至交換分割槽。
  • network.host: 192.168.108.35  #網路監聽地址,可以訪問elasticsearch的ip, 預設只有本機
  • http.port: 9200       #使用者訪問檢視的埠,9300是ES元件訪問使用
  • discovery.seed_hosts: ["192.168.108.35","192.168.108.194"]  單播(配置一臺即可,生產可以使用組播方式)
  • cluster.initial_master_nodes: ["192.168.108.35"]   # Master
  • http.cors.enabled: true   # 這二個是ES外掛的web顯示設定,head外掛防止跨域
  • http.cors.allow-origin: "*"

 

可見叢集配置中最重要的兩項是node.name與network.host,每個節點都必須不同。其中node.name是節點名稱主要是在Elasticsearch自己的日誌加以區分每一個節點資訊。
discovery.seed_hosts是叢集中的節點資訊,可以使用IP地址、可以使用主機名(必須可以解析)。

當ElasticSearch的節點啟動後,它會利用多播(multicast)(或者單播),如果使用者更改了配置)尋找叢集中的其它節點,並與之建立連線。(節點發現)

這裡沒有配置分片的數目,在ES的7版本中,預設為1,這個包括replicas可後來自定義設定。

 

在這個小節,配置總體結構與日誌資料流向如下圖:

 

5】Elasticsearch(ES)叢集與Kibana連線配置

Kibana是一個開源的分析和視覺化平臺,和Elasticsearch一起工作時,就可以用Kibana來搜尋,檢視,並和儲存在Elasticsearch索引中的資料進行互動。
可以輕鬆地執行高階資料分析,並且以各種圖示、表格和地圖的形式視覺化資料。
同時Kibana使得理解大量資料變得很容易。它簡單的、基於瀏覽器的介面使你能夠快速建立和共享動態儀表板,實時顯示Elasticsearch查詢的變化。

在第四步中,我們已經將ES叢集中的資料進行了處理,建立了索引(相當於MySQL的database概念),並且分片、副本等都存在,而且處理後的資料更加方便於檢索。

Kibana的配置在/etc/kibana/kibana.yml檔案中,如下:

  • server.port: 5601 #監聽埠
  • server.host: "192.168.108.182" #監聽IP地址,建議內網ip
  • #kibana.index: ".newkibana"
  • elasticsearch.hosts: ["http://192.168.108.35:9200","http://192.168.108.194:9200"] # ES機器的IP,這裡的所有host必須來自於同一個ES叢集
  • #xpack.security.enabled: false #新增這條,這條是配置kibana的安全機制,暫時關閉。

可以看出,這裡的Kibana會主動去ES叢集中去拉取資料,最後在瀏覽器Web端進行實時展示。

這時,我們開啟瀏覽器介面如下,看看最終的結果:

 

 

注:Nginx代理配置,可以通過自定義域名訪問並設定密碼,提升安全性。(可選)

upstream kibana_server {
        server 192.168.108.182:5601 weight=1 max_fails=3 fail_timeout=60;
    }
    server {
        listen 80;
        server_name www.kibana.com;
        auth_basic "Restricted Access";
        auth_basic_user_file /etc/nginx/conf.d/htpasswd.users;
        location / {
                proxy_pass http://kibana_server;
                proxy_http_version 1.1;
        }
    }

在這個小節,配置總體結構與日誌資料流向如下圖:

 

6】總結

 

再將日誌流向的配置檔案輸入輸出標註一下:

 

 

 

&n