ELK+Kafka學習筆記之搭建ELK+Kafka日誌收集系統叢集
0x00 概述
關於如何搭建ELK部分,請參考 ofollow,noindex">這篇文章 , https://www.cnblogs.com/JetpropelledSnake/p/9893566.html。
該篇使用者為非root,使用使用者為“elk”。
基於以前ELK架構的基礎,結合Kafka佇列,實現了ELK+Kafka叢集,整體架構如下:
# 1. 兩臺es組成es叢集;( 以下對elasticsearch簡稱es ) # 2. 中間三臺伺服器就是我的kafka(zookeeper)叢集啦; 上面寫的 消費者/生產者 這是kafka(zookeeper)中的概念; # 3. 最後面的就是一大堆的生產伺服器啦,上面使用的是logstash,當然除了logstash也可以使用其他的工具來收集你的應用程式的日誌,例如:Flume,Scribe,Rsyslog,Scripts……
# 雙ES備份叢集: CentOS1+CentOS2 # Kafka+LogStash叢集:CentOS3+CentOS4+CentOS5 # ES-Header+Kibina+LogStash/Beats採集:CentOS6+CentOS7 # 注意!!由於測試環境限制,此篇部落格中CentOS6+CentOS7的內容放到CentOS1+CentOS2中的
使用軟體:
# Elasticsearch 6.2.4 # Kibana 6.2.4 # Logstash 6.2.4 # elasticsearch-head(從github直接clone) # filebeat 6.2.4 (本篇文章使用Logstash做蒐集用,可以用filebeat替換Logstash) # phantomjs-2.1.1-linux-x84_64 (配合npm指令使用) # node-v4.9.1-linux-x64 (記住資料夾路徑,需要增加到bash_profile中) # kafka_2.11-1.0.0 (該版本已經包含ZooKeeper服務)
部署步驟:
# ES叢集安裝配置; # Logstash客戶端配置(直接寫入資料到ES叢集,寫入系統messages日誌); # Kafka(zookeeper)叢集配置;(Logstash寫入資料到Kafka訊息系統);
0x01 ES叢集安裝配置
ES安裝請參考 這裡 , https://www.cnblogs.com/JetpropelledSnake/p/9893566.html;
1. 下面修改ES的配置檔案:
# 確保下列引數被正確設定: cluster.name: logger# ES叢集的名字 node.name: node-1 path.data: /usr/local/app/elasticsearch-6.2.4/data path.logs: /usr/local/app/elasticsearch-6.2.4/log bootstrap.memory_lock: false# 對於非專用ES,建議設定為false,預設為true bootstrap.system_call_filter: false network.host: 0.0.0.0# 支援遠端訪問 http.port: 9200 # restful api訪問介面 http.cors.enabled: true#允許ES head跨域訪問 http.cors.allow-origin: "*"#允許ES head跨域訪問
2. 獲取es服務管理指令碼
[ root @ elk ~ ] # git clone https://github.com/elastic/elasticsearch-servicewrapper.git [ root @ elk ~ ] # mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/ [ root @ elk ~ ] # /usr/local/elasticsearch/bin/service/elasticsearch install Detected RHEL or Fedora : Installing the Elasticsearch daemon . . [ root @ elk ~ ] # #這時就會在/etc/init.d/目錄下安裝上es的管理指令碼啦 #修改其配置: [ root @ elk ~ ] # set . default . ES_HOME = /usr/local/app/elasticsearch-6.2.4#安裝路徑 set . default . ES_HEAP_SIZE = 1024#jvm記憶體大小,根據實際環境調整即可
3. 啟動es ,並檢查其服務是否正常
[ root @ elk ~ ] # netstat -nlpt | grep -E "9200|"9300 tcp00 0.0.0.0 : 92000.0.0.0 : *LISTEN1684/ java tcp00 0.0.0.0 : 93000.0.0.0 : *LISTEN1684/ java
訪問http://192.168.26.135:9200/ 如果出現以下提示資訊說明安裝配置完成
4. es1節點已經設定好,我們直接把目錄複製到es2
[ root @ elk ] # scp -r elasticsearch-6.2.4192.168.26.136:/usr/local/app [ root @ elk ] # elasticsearch/bin/service/elasticsearch install #es2只需要修改node.name(此處使用的是node-2和node-1)即可,其他都與es1相同配置
5. 安裝es的管理外掛elasticsearch-head
es-head安裝,請參考 這裡 ,https://www.cnblogs.com/JetpropelledSnake/p/9893566.html。
安裝好之後,訪問方式為:http://192.168.26.135:9100/
到此,ES叢集的部署完成。
0x02 Logstash客戶端安裝配置
LogStash安裝請參考 這裡 ,https://www.cnblogs.com/JetpropelledSnake/p/9893566.html;
# Logstach需要搭配指定的配置檔案啟動,建立一個logstash配置檔案,比如logstash-es.conf,啟動LogStash時候使用;根據不同的配置檔案,LogStash會做不同工作。
1. 通過stdin標準實時輸入的方式向Logstash向es叢集寫資料(測試,暫未通過Kafka佇列傳遞)
1.1 使用如下命令建立LogStash啟動配置檔案
# cd /usr/local/app/logstash-6.2.4/config # touch logstash_for_stdin.conf
# [elk@localhost config]$ vim logstash_for_stdin.conf
1.2 檢查配置檔案是否有語法錯(如下 -t 為測試命令)
# [elk@localhost logstash-6.2.4]$ ./bin/logstash -f ./config/logstash_for_stdin.conf -t
1.3 啟動 Logstash,開始寫入操作
# [elk@localhost logstash-6.2.4]$ ./bin/logstash -f ./config/logstash_for_stdin.conf
以上是輸入資料,從ES-Header(192.168.26.135:9200)外掛就可以看到, 資料已經分散的寫入到兩個叢集內
ES Server:192.168.26.136
ES Server:192.168.26.135
2. 通過採集制定檔案的輸入的方式向Logstash向es叢集寫資料(測試,暫未通過Kafka佇列傳遞)
2.1首先建立一個用於採集系統日誌的LogStash啟動的conf檔案,使用如下命令
# cd /usr/local/app/logstash-6.2.4/config # touch logstash_for_system_messages.conf
2.2 編輯logstash_for_system_messages.conf檔案
# [elk@localhost config]$ vim logstash_for_system_messages.conf
input { file{ path => "/var/log/messages"#這是日誌檔案的絕對路徑 start_position=>"beginning"#這個表示從 messages 的第一行讀取,即檔案開始處 } } output { #輸出到 es elasticsearch {#這裡將按照這個索引格式來建立索引 hosts=>["192.168.26.135:9200","192.168.26.136:9200"] index=>"system-messages-%{+YYYY-MM}" } }
2.3 測試ogstash_for_system_messages.conf檔案
# [elk@localhost logstash-6.2.4]$ ./bin/logstash -f ./config/logstash_for_system_messages.conf -t
2.4 啟動LogStash,開始採集系統日誌
# 注意,系統日誌需要許可權,命令列前面使用 sudo # [elk@localhost logstash-6.2.4]$ sudo ./bin/logstash -f ./config/logstash_for_system_messages.conf
ES Server: 192.168.26.136
ES Server: 192.168.26.135
系統日誌我們已經成功的收集,並且已經寫入到es叢集中.
上面的演示是logstash直接將日誌寫入到es叢集中的,這種場合我覺得如果量不是很大的話直接像上面已將將輸出output定義到es叢集即可.
如果量大的話需要加上訊息佇列Kafka來緩解es叢集的壓力,下面就在三臺server上面安裝kafka叢集。
0x03 Kafka叢集安裝配置1
本篇部落格使用的Kafka版本為
# kafka_2.11-1.0.0 (該版本已經包含ZooKeeper服務)
1. 從官網獲取裝包, https://kafka.apache.org/downloads
2. 配置Zookeeper叢集,修改配置檔案
# [elk@localhost config]$ vim zookeeper.properties
修改內容如下:
dataDir=/usr/local/app/kafka_2.11-1.0.0/zookeeperDir # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=1024 tickTime=2000 initLimit=20 syncLimit=10 server.2=192.168.26.137:2888:3888 server.3=192.168.26.138:2888:3888 server.4=192.168.26.139:2888:3888
# 說明: tickTime : 這個時間是作為 Zookeeper 伺服器之間或客戶端與伺服器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。 2888 埠:表示的是這個伺服器與叢集中的 Leader 伺服器交換資訊的埠; 3888 埠:表示的是萬一叢集中的 Leader 伺服器掛了,需要一個埠來重新進行選舉,選出一個新的 Leader ,而這個埠就是用來執行選舉時伺服器相互通訊的埠。
3. 建立Zookeeper所需的目錄
進入Kafka目錄內
建立目錄zookeeperDir
# [elk@localhost kafka_2.11-1.0.0]$ mkdir zookeeperDir
# [elk@localhost kafka_2.11-1.0.0]$ cd zookeeperDir
在zookeeperDir目錄下建立myid檔案,裡面的內容為數字,用於標識主機,如果這個檔案沒有的話,zookeeper是沒法啟動的
# [elk@localhost zookeeperDir]$ touch myid
命名myid
# [elk@localhost zookeeperDir]$ echo 2 > myid
以上就是zookeeper叢集的配置,下面等我配置好kafka之後直接複製到其他兩個節點即可
4. Kafka配置
修改內容如下
broker . id = 2# 唯一,填數字,本文中分別為 2 / 3 / 4 prot = 9092# 這個 broker 監聽的埠 host . name = 192.168.2.22# 唯一,填伺服器 IP log . dir = / data / kafka - logs#該目錄可以不用提前建立,在啟動時自己會建立 zookeeper . connect = 192.168.2.22 : 2181 , 192.168.2.23 : 2181 , 192.168.2.24 :2181# 這個就是 zookeeper 的 ip 及埠 num . partitions = 16# 需要配置較大 分片影響讀寫速度 log . dirs = / data / kafka - logs # 資料目錄也要單獨配置磁碟較大的地方 log . retention . hours = 168# 時間按需求保留過期時間 避免磁碟滿
5. 將kafka(zookeeper)的程式目錄全部拷貝至其他兩個節點
# [elk@localhost app]$ scp kafka_2.11-1.0.0 192.168.26.138:/usr/local/app/ # [elk@localhost app]$ scp kafka_2.11-1.0.0 192.168.26.139:/usr/local/app/
6. 修改兩個節點的配置,注意這裡除了以下兩點不同外,都是相同的配置
# zookeeper 的配置 mkdir zookeeperDir echo "x" > zookeeperDir/myid# “x”這裡是3或者4,前面已經使用過2了 # kafka 的配置 broker.id = x# “x”這裡是3或者4,前面已經使用過2了 host.name = 192.168.26.137# ip改為26.138或者26.139
7. 修改完畢配置之後我們就可以啟動了,這裡先要啟動zookeeper叢集,才能啟動kafka
我們按照順序來,kafka1 –> kafka2 –>kafka3
# [elk@localhost kafka_2.11-1.0.0]$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &#zookeeper啟動命令 # [elk@localhost kafka_2.11-1.0.0]$ ./bin/zookeeper-server-stop.sh#zookeeper停止的命令
注意,如果zookeeper有問題 nohup的日誌檔案會非常大,把磁碟佔滿,這個zookeeper服務可以通過自己些服務指令碼來管理服務的啟動與關閉。
後面兩臺執行相同操作,在啟動過程當中會出現以下報錯資訊
[2018-12-03 15:06:02,089] WARN Cannot open channel to 4 at election address /192.168.26.139:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager) java.net.ConnectException: Connection refused (Connection refused) at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:562) at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:614) at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:843) at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:913)
由於zookeeper叢集在啟動的時候,每個結點都試圖去連線叢集中的其它結點,先啟動的肯定連不上後面還沒啟動的,所以上面日誌前面部分的異常是可以忽略的。通過後面部分可以看到,叢集在選出一個Leader後,最後穩定了。
其他節點也可能會出現類似的情況,屬於正常。
8. Zookeeper服務檢查
在Kafka三臺Server上執行如下命令
# netstat -nlpt | grep -E "2181|2888|3888"
顯示如下埠和資訊,說明Zookeeper執行正常
[elk@localhost ~]$ netstat -nlpt | grep -E "2181|2888|3888" (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp600 :::2181:::*LISTEN5465/java tcp600 192.168.26.139:3888:::*LISTEN5465/java
可以看出,如果哪臺是Leader,那麼它就擁有2888這個埠
[elk@localhost ~]$ netstat -nlpt | grep -E "2181|2888|3888" (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp600 :::2181:::*LISTEN5434/java tcp600 192.168.26.138:2888:::*LISTEN5434/java tcp600 192.168.26.138:3888:::*LISTEN5434/java
9. 啟動Kafka服務
這時候zookeeper叢集已經啟動起來了,下面啟動kafka,也是依次按照順序啟動,kafka1 –> kafka2 –>kafka3
# [elk@localhost kafka_2.11-1.0.0]$ pwd # /usr/local/app/kafka_2.11-1.0.0 # [elk@localhost kafka_2.11-1.0.0]$ ./bin/kafka-server-start.sh ./config/server.properties &#啟動Kafka服務
# [elk@localhost kafka_2.11-1.0.0]$ ./bin/kafka-server-stop.sh# 停止Kafka服務
10. 此時三臺上面的zookeeper及kafka都已經啟動完畢,下面來測試一下
10.1 建立一個主題(在Kafka Server 192.168.26.137)
# 注意:factor大小不能超過broker的個數
# ./bin/kafka-topics.sh --create --zookeeper 192.168.26.137:2181 --replication-factor 3 --partitions 1 --topic test1test2test
10.2 檢視已經建立的topics(Kafka Server :192.168.26.137)
# ./bin/kafka-topics.sh --list --zookeeper 192.168.26.137:2181
10.3 檢視test1test2test這個主題的詳情(Kafka Server :192.168.26.137)
#主題名稱:test1test2test #Partition:只有一個,從0開始 #leader :id為4的broker #Replicas 副本存在於broker id為2,3,4的上面 #Isr:活躍狀態的broker
10.4 傳送訊息,這裡使用的是生產者角色(Kafka Server :192.168.26.137)
# ./bin/kafka-console-producer.sh --broker-list 192.168.26.137:9092 --topic test1test2test
10.5 接收訊息,這裡使用的是消費者角色(更換到另外一臺KafkaServer上執行, Kafka Server :192.168.26.138 )
訊息已經顯示出來了。
# ./bin/kafka-console-consumer.sh --zookeeper192.168.26.138:2181 --topic test1test2test --from-beginning
以上是Kafka生產者和消費者的測試,基於Kafka的Zookeeper叢集就成功了。
11. 下面我們將ES Server:192.168. 26.135 上面的logstash的輸出改到kafka上面,將資料寫入到kafka中
11.1 建立LogStash結合Kafka使用的.conf檔案,注意檔案目錄
11.2 編輯輸入到Kafka的.conf檔案
# vim logstash_for_kafka.conf
input {#這裡的輸入還是定義的是從日誌檔案輸入 file { type = > "system-message" path = > "/var/log/messages" start_position = > "beginning" } } output {#stdout { codec => rubydebug }#這是標準輸出到終端,可以用於除錯看有沒有輸出,注意輸出的方向可以有多個 kafka {#輸出到kafka bootstrap_servers = > "192.168.26.137:9092,192.168.26.138:9092,192.168.26.139:9092"#他們就是生產者 topic_id = > "system-secure"#這個將作為主題的名稱,將會自動建立 compression_type = > "snappy"#壓縮型別 } }
11.3 測試輸入到Kafka的配置檔案
# [elk@localhost logstash-6.2.4]$ ./bin/logstash -f ./config/logstash_for_kafka.conf -t
11.4 使用該配置檔案啟動LogStash
注意採集系統日誌需要許可權,命令列前面必須加sudo
# [elk@localhost logstash-6.2.4]$ sudo ./bin/logstash -f ./config/logstash_for_kafka.conf
11.5 驗證資料是否寫入到kafka,這裡我們檢查是否生成了一個叫system-secure的主題
# [elk@localhost kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --list --zookeeper 192.168.26.137:2181
11.6 檢視system-secure詳情
# [elk@localhost kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --describe --zookeeper 192.168.26.137:2181 --topic system-secure
11.7 對於logstash輸出的我們也可以提前先定義主題,然後啟動logstash 直接往定義好的主題寫資料就行啦,命令如下:
#[elk@localhost kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --create --zookeeper 192.168.26.137:2181 --replication-factor 3 --partitions 3 --topic TOPIC_NAME
好了,我們將logstash收集到的資料寫入到了kafka中了,在實驗過程中我使用while指令碼測試瞭如果不斷的往kafka寫資料的同時停掉兩個節點,資料寫入沒有任何問題。
0x04 Kafka叢集安裝配置2
那如何將資料從kafka中讀取然後給ES叢集呢?
那下面我們在kafka叢集上安裝Logstash,可以通過上面的方法,先配置一臺的logstash,然後通過scp命令通過網路拷貝,安裝路徑是“/usr/local/app/logstash-6.2.4”;
三臺上面的logstash的配置如下,作用是將kafka叢集的資料讀取然後轉交給es叢集,這裡為了測試我讓他新建一個索引檔案,注意這裡的輸入日誌是secure,主題名稱是“system-secure”;
1. 如上所說,在三臺Kafka叢集的Server上分別安裝LogStash,並在如下目錄新建logstash_kafka2ES.conf
2. 配置logstash_kafka2ES.conf
# [elk@localhost config]$ vim logstash_kafka2ES.conf
3. 在三臺Kafka Server上按順序分別啟動LogStash,啟動命令三臺是通用的
# [elk@localhost logstash-6.2.4]$ ./bin/logstash -f ./config/logstash_kafka2ES.conf
4. 測試通過Kafka佇列傳遞訊息到ES叢集
在ES Server:192.168.26.135上寫入測試內容,利用secure這個檔案來測試
可以看到訊息已經分散的寫入ES及叢集中
ES Server 192.168.26.135
ES Server 192.168.26.136
0x05 在Kibana中展示
ELK中的Kibana中的使用和部署,請看這篇文章 https://www.cnblogs.com/JetpropelledSnake/p/9893566.html
1. 根據ES叢集中存入的資料,在Kibana中建立Index
2. 建立好index後,就可以在Discover這裡查詢介入的資料了
3. ES叢集採用的查詢語言是DSL語言,該語言採用json的格式組織查詢語句。
4. Kibana可以多樣化展示ES叢集中介入的資料,github上有很多定製好的格式,針對常見生產環境可以直接配置,十分方便。