1. 程式人生 > >ELK+kafka構建日誌收集系統

ELK+kafka構建日誌收集系統

背景:

最近線上上了ELK,但是隻用了一臺Redis在中間作為訊息佇列,以減輕前端es叢集的壓力,Redis的叢集解決方案暫時沒有接觸過,並且Redis作為訊息佇列並不是它的強項;所以最近將Redis換成了專業的訊息資訊釋出訂閱系統Kafka, Kafka的更多介紹大家可以看這裡: 傳送門 ,關於ELK的知識網上有很多的哦, 此篇部落格主要是總結一下目前線上這個平臺的實施步驟,ELK是怎麼跟Kafka結合起來的。好吧,動手!

ELK架構拓撲:

然而我這裡的整個日誌收集平臺就是這樣的拓撲:

1,使用一臺Nginx代理訪問kibana的請求;

2,兩臺es組成es叢集,並且在兩臺es上面都安裝kibana;( 以下對elasticsearch簡稱es )

3,中間三臺伺服器就是我的kafka(zookeeper)叢集啦; 上面寫的 消費者/生產者 這是kafka(zookeeper)中的概念;

4,最後面的就是一大堆的生產伺服器啦,上面使用的是logstash,當然除了logstash也可以使用其他的工具來收集你的應用程式的日誌,例如:Flume,Scribe,Rsyslog,Scripts……

角色:

軟體選用:

elasticsearch - 1.7.3.tar.gz #這裡需要說明一下,前幾天使用了最新的elasticsearch2.0,java-1.8.0報錯,目前未找到原因,故這裡使用1.7.3版本

Logstash - 2.0.0.tar.gz

kibana - 4.1.2 - linux - x64 . tar . gz

以上軟體都可以從官網下載 : https : //www.elastic.co/downloads

java - 1.8.0 , nginx 採用 yum 安裝

部署步驟:

1.ES叢集安裝配置;

2.Logstash客戶端配置(直接寫入資料到ES叢集,寫入系統messages日誌);

3.Kafka(zookeeper)叢集配置;(Logstash寫入資料到Kafka訊息系統);

4.Kibana部署;

5.Nginx負載均衡Kibana請求;

6.案例:nginx日誌收集以及MySQL慢日誌收集;

7.Kibana報表基本使用;

ES叢集安裝配置;

es1.example.com:

1.安裝java-1.8.0以及依賴包

yum install - y epel - release

yum install - y java - 1.8.0 git wget lrzsz

2.獲取es軟體包

wget https : //download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.3.tar.gz

tar - xf elasticsearch - 1.7.3.tar.gz - C / usr / local

ln - sv / usr / local / elasticsearch - 1.7.3 / usr / local / elasticsearch

3.修改配置檔案

[ root @ es1 ~ ] # vim /usr/local/elasticsearch/config/elasticsearch.yml

32 cluster . name : es - cluster                          #組播的名稱地址

40 node . name : "es-node1 "                            #節點名稱,不能和其他節點重複

47 node . master : true                                  #節點能否被選舉為master

51 node . data : true                                    #節點是否儲存資料

107 index . number_of_shards : 5                        #索引分片的個數

111 index . number_of_replicas : 1                      #分片的副本個數

145 path . conf : / usr / local / elasticsearch / config /      #配置檔案的路徑

149 path . data : / data / es / data                          #資料目錄路徑

159 path . work : / data / es / worker                        #工作目錄路徑

163 path . logs :    / usr / local / elasticsearch / logs /        #日誌檔案路徑

167 path . plugins :    / data / es / plugins                  #外掛路徑

184 bootstrap . mlockall : true                          #記憶體不向swap交換

232 http . enabled : true                                #啟用http

4.建立相關目錄

mkdir / data / es / { data , worker , plugins } - p

5.獲取es服務管理指令碼

[ root @ es1 ~ ] # git clone https://github.com/elastic/elasticsearch-servicewrapper.git

[ root @ es1 ~ ] # mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/

[ root @ es1 ~ ] # /usr/local/elasticsearch/bin/service/elasticsearch install

Detected RHEL or Fedora :

Installing the Elasticsearch daemon . .

[ root @ es1 ~ ] #

#這時就會在/etc/init.d/目錄下安裝上es的管理指令碼啦

#修改其配置:

[ root @ es1 ~ ] #

set . default . ES_HOME = / usr / local / elasticsearch    #安裝路徑

set . default . ES_HEAP_SIZE = 1024                    #jvm記憶體大小,根據實際環境調整即可

6.啟動es ,並檢查其服務是否正常

[ root @ es1 ~ ] # netstat -nlpt | grep -E "9200|"9300

tcp          0        0 0.0.0.0 : 9200                  0.0.0.0 : *                    LISTEN        1684/ java          

tcp          0        0 0.0.0.0 : 9300                  0.0.0.0 : *                    LISTEN        1684/ java

訪問http://192.168.2.18:9200/ 如果出現以下提示資訊說明安裝配置完成啦,

7.es1節點好啦,我們直接把目錄複製到es2

[ root @ es1 local ] # scp -r elasticsearch-1.7.3  192.168.12.19:/usr/local/

[ root @ es2 local ] # ln -sv elasticsearch-1.7.3 elasticsearch

[ root @ es2 local ] # elasticsearch/bin/service/elasticsearch install

#es2只需要修改node.name即可,其他都與es1相同配置

8.安裝es的管理外掛

es官方提供一個用於管理es的外掛,可清晰直觀看到es叢集的狀態,以及對叢集的操作管理,安裝方法如下:

[ root @ es1 local ] # /usr/local/elasticsearch/bin/plugin -i mobz/elasticsearch-head

安裝好之後,訪問方式為: http://192.168.2.18:9200/_plugin/head,由於叢集中現在暫時沒有資料,所以顯示為空,

此時,es叢集的部署完成。

Logstash客戶端安裝配置;

在webserve1上面安裝Logstassh

1.downloads  軟體包 ,這裡注意,Logstash是需要依賴java環境的,所以這裡還是需要yum install -y java-1.8.0.

[ root @ webserver1 ~ ] # wget https://download.elastic.co/logstash/logstash/logstash-2.0.0.tar.gz

[ root @ webserver1 ~ ] # tar -xf logstash-2.0.0.tar.gz -C /usr/local

[ root @ webserver1 ~ ] # cd /usr/local/

[ root @ webserver1 local ] # ln -sv logstash-2.0.0 logstash

[ root @ webserver1 local ] # mkdir logs etc

2.提供logstash管理指令碼,其中裡面的配置路徑可根據實際情況修改

#!/bin/bash

#chkconfig: 2345 55 24

#description: logstash service manager

#auto: Maoqiu Guo

FILE = '/usr/local/logstash/etc/*.conf'      #logstash配置檔案

LOGBIN = '/usr/local/logstash/bin/logstash agent --verbose --config'    #指定logstash配置檔案的命令

LOCK = '/usr/local/logstash/locks'          #用鎖檔案配合服務啟動與關閉

LOGLOG = '--log /usr/local/logstash/logs/stdou.log'    #日誌

START ( ) {

if [ - f $ LOCK ] ; then

echo - e "Logstash is already \033[32mrunning\033[0m, do nothing."

else

echo - e "Start logstash service.\033[32mdone\033[m"

nohup $ { LOGBIN } $ { FILE } $ { LOGLOG } &

touch $ LOCK

fi

}

STOP ( ) {

if [ ! - f $ LOCK ] ; then

echo - e "Logstash is already stop, do nothing."

else

echo - e "Stop logstash serivce \033[32mdone\033[m"

rm - rf $ LOCK

ps - ef | grep logstash | grep - v "grep" | awk '{print $2}' | xargs kill - s 9 > / dev / null

fi

}

STATUS ( ) {

ps aux | grep logstash | grep - v "grep" > / dev / null

if [ - f $ LOCK ] && [ $ ? - eq 0 ] ; then

echo - e "Logstash is: \033[32mrunning\033[0m..."

else

echo - e "Logstash is: \033[31mstopped\033[0m..."

fi

}

TEST ( ) {

$ { LOGBIN } $ { FILE } -- configtest

}

case "$1" in

start )

START

; ;

stop )

STOP

; ;

status )

STATUS

; ;

restart )

STOP

sleep 2

START

; ;

test )

TEST

; ;

* )

echo "Usage: /etc/init.d/logstash (test|start|stop|status|restart)"

; ;

esac

3.Logstash 向es叢集寫資料

(1)編寫一個logstash配置檔案

[ root @ webserver1 etc ] # cat logstash.conf

input {                #資料的輸入從標準輸入

stdin { }   

}

output {              #資料的輸出我們指向了es叢集

elasticsearch {

hosts = > [ "192.168.2.18:9200" , "192.168.2.19:9200" ] # es 主機的 ip 及埠

}

}

[ root @ webserver1 etc ] #

(2)檢查配置檔案是否有語法錯

[ root @ webserver1 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose

Configuration OK

[ root @ webserver1 etc ] #

(3)既然配置ok我們手動啟動它,然後寫點東西看能否寫到es

ok.上圖已經看到logstash已經可以正常的工作啦.

4.下面演示一下如何收集系統日誌

將之前的配置檔案修改如下所示內容,然後啟動logstash服務就可以在web頁面中看到messages的日誌寫入es,並且建立了一條索引

[ root @ webserver1 etc ] # cat logstash.conf

input { #這裡的輸入使用的檔案,即日誌檔案messsages

file {

path = > "/var/log/messages" #這是日誌檔案的絕對路徑

start_position = > "beginning" #這個表示從 messages 的第一行讀取,即檔案開始處

}

}

output { #輸出到 es

elasticsearch {

hosts = > [ "192.168.2.18:9200" , "192.168.2.19:9200" ]

index = > "system-messages-%{+YYYY-MM}" #這裡將按照這個索引格式來建立索引

}

}

[ root @ webserver1 etc ] #

啟動logstash後,我們來看head這個外掛的web頁面

ok,系統日誌我們已經成功的收集,並且已經寫入到es叢集中,那上面的演示是logstash直接將日誌寫入到es叢集中的,這種場合我覺得如果量不是很大的話直接像上面已將將輸出output定義到es叢集即可,如果量大的話需要加上訊息佇列來緩解es叢集的壓力。前面已經提到了我這邊之前使用的是單臺redis作為訊息佇列,但是redis不能作為list型別的叢集,也就是redis單點的問題沒法解決,所以這裡我選用了kafka ;下面就在三臺server上面安裝kafka叢集

Kafka叢集安裝配置;

在搭建kafka叢集時,需要提前安裝zookeeper叢集,當然kafka已經自帶zookeeper程式只需要解壓並且安裝配置就行了

kafka1上面的配置:

1.獲取軟體包.官網: http://kafka.apache.org

[ root @ kafka1 ~ ] # wget http://mirror.rise.ph/apache/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz

[ root @ kafka1 ~ ] # tar -xf kafka_2.11-0.8.2.1.tgz -C /usr/local/

[ root @ kafka1 ~ ] # cd /usr/local/

[ root @ kafka1 local ] # ln -sv kafka_2.11-0.8.2.1 kafka

2.配置zookeeper叢集,修改配置檔案

[ root @ kafka1 ~ ] # vim /usr/local/kafka/config/zookeeper.propertie

dataDir = / data / zookeeper

clienrtPort = 2181

tickTime = 2000

initLimit = 20

syncLimit = 10

server . 2 = 192.168.2.22 : 2888 : 3888

server . 3 = 192.168.2.23 : 2888 : 3888

server . 4 = 192.168.2.24 : 2888 : 3888

#說明:

tickTime : 這個時間是作為 Zookeeper 伺服器之間或客戶端與伺服器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。

2888 埠:表示的是這個伺服器與叢集中的 Leader 伺服器交換資訊的埠;

3888 埠:表示的是萬一叢集中的 Leader 伺服器掛了,需要一個埠來重新進行選舉,選出一個新的 Leader ,而這個埠就是用來執行選舉時伺服器相互通訊的埠。

3.建立zookeeper所需要的目錄

[ root @ kafka1 ~ ] # mkdir /data/zookeeper

4.在/data/zookeeper目錄下建立myid檔案,裡面的內容為數字,用於標識主機,如果這個檔案沒有的話,zookeeper是沒法啟動的哦

[ root @ kafka1 ~ ] # echo 2 > /data/zookeeper/myid

以上就是zookeeper叢集的配置,下面等我配置好kafka之後直接複製到其他兩個節點即可

5.kafka配置

[ root @ kafka1 ~ ] # vim /usr/local/kafka/config/server.properties

broker . id = 2           # 唯一,填數字,本文中分別為 2 / 3 / 4

prot = 9092      # 這個 broker 監聽的埠

host . name = 192.168.2.22    # 唯一,填伺服器 IP

log . dir = / data / kafka - logs    #  該目錄可以不用提前建立,在啟動時自己會建立

zookeeper . connect = 192.168.2.22 : 2181 , 192.168.2.23 : 2181 , 192.168.2.24 :2181 #這個就是 zookeeper 的 ip 及埠

num . partitions = 16          # 需要配置較大 分片影響讀寫速度

log . dirs = / data / kafka - logs # 資料目錄也要單獨配置磁碟較大的地方

log . retention . hours = 168    # 時間按需求保留過期時間 避免磁碟滿

6.將kafka(zookeeper)的程式目錄全部拷貝至其他兩個節點

[ root @ kafka1 ~ ] # scp -r /usr/local/kafka 192.168.2.23:/usr/local/

[ root @ kafka1 ~ ] # scp -r /usr/local/kafka 192.168.2.24:/usr/local/

7.修改兩個借點的配置,注意這裡除了以下兩點不同外,都是相同的配置

( 1 ) zookeeper 的配置

mkdir / data / zookeeper

echo "x" > / data / zookeeper / myid

( 2 ) kafka 的配置

broker . id = 2

host . name = 192.168.2.22

8.修改完畢配置之後我們就可以啟動了,這裡先要啟動zookeeper叢集,才能啟動kafka

我們按照順序來,kafka1 –> kafka2 –>kafka3

[ root @ kafka1 ~ ] # /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &   #zookeeper啟動命令

[ root @ kafka1 ~ ] # /usr/local/kafka/bin/zookeeper-server-stop.sh                                                   #zookeeper停止的命令

注意,如果zookeeper有問題 nohup的日誌檔案會非常大,把磁碟佔滿,這個zookeeper服務可以通過自己些服務指令碼來管理服務的啟動與關閉。

後面兩臺執行相同操作,在啟動過程當中會出現以下報錯資訊

[ 2015 - 11 - 13 19 : 18 : 04 , 225 ] WARN Cannot open channel to 3 at election address / 192.168.2.23 : 3888 ( org . apache . zookeeper . server . quorum . QuorumCnxManager )

java . net . ConnectException : Connection refused

at java . net . PlainSocketImpl . socketConnect ( Native Method )

at java . net . AbstractPlainSocketImpl . doConnect ( AbstractPlainSocketImpl . java : 350 )

at java . net . AbstractPlainSocketImpl . connectToAddress ( AbstractPlainSocketImpl . java : 206 )

at java . net . AbstractPlainSocketImpl . connect ( AbstractPlainSocketImpl . java :188 )

at java . net . SocksSocketImpl . connect ( SocksSocketImpl . java : 392 )

at java . net . Socket . connect ( Socket . java : 589 )

at org . apache . zookeeper . server . quorum . QuorumCnxManager . connectOne( QuorumCnxManager . java : 368 )

at org . apache . zookeeper . server . quorum . QuorumCnxManager . connectAll (QuorumCnxManager . java : 402 )

at org . apache . zookeeper . server . quorum . FastLeaderElection . lookForLeader( FastLeaderElection . java : 840 )

at org . apache . zookeeper . server . quorum . QuorumPeer . run ( QuorumPeer . java : 762 )

[ 2015 - 11 - 13 19 : 18 : 04 , 232 ] WARN Cannot open channel to 4 at election address / 192.168.2.24 : 3888 ( org . apache . zookeeper . server . quorum . QuorumCnxManager )

java . net . ConnectException : Connection refused

at java . net . PlainSocketImpl . socketConnect ( Native Method )

at java . net . AbstractPlainSocketImpl . doConnect ( AbstractPlainSocketImpl . java : 350 )

at java . net . AbstractPlainSocketImpl . connectToAddress ( AbstractPlainSocketImpl . java : 206 )

at java . net . AbstractPlainSocketImpl . connect ( AbstractPlainSocketImpl . java :188 )

at java . net . SocksSocketImpl . connect ( SocksSocketImpl . java : 392 )

at java . net . Socket . connect ( Socket . java : 589 )

at org . apache . zookeeper . server . quorum . QuorumCnxManager . connectOne( QuorumCnxManager . java : 368 )

at org . apache . zookeeper . server . quorum . QuorumCnxManager . connectAll (QuorumCnxManager . java : 402 )

at org . apache . zookeeper . server . quorum . FastLeaderElection . lookForLeader( FastLeaderElection . java : 840 )

at org . apache . zookeeper . server . quorum . QuorumPeer . run ( QuorumPeer . java : 762 )

[ 2015 - 11 - 13 19 : 18 : 04 , 233 ] INFO Notification time out : 6400 ( org . apache. zookeeper . server . quorum . FastLeaderElection )

由於zookeeper叢集在啟動的時候,每個結點都試圖去連線叢集中的其它結點,先啟動的肯定連不上後面還沒啟動的,所以上面日誌前面部分的異常是可以忽略的。通過後面部分可以看到,叢集在選出一個Leader後,最後穩定了。

其他節點也可能會出現類似的情況,屬於正常。

9.zookeeper服務檢查

[ root @ kafka1 ~ ] #  netstat -nlpt | grep -E "2181|2888|3888"

tcp          0        0 192.168.2.24 : 3888            0.0.0.0 : *                    LISTEN        1959 / java            

tcp          0        0 0.0.0.0 : 2181                  0.0.0.0 : *                    LISTEN        1959/ java                       

[ root @ kafka2 ~ ] #  netstat -nlpt | grep -E "2181|2888|3888"

tcp          0        0 192.168.2.23 : 3888            0.0.0.0 : *                    LISTEN        1723 / java    

tcp          0        0 0.0.0.0 : 2181                  0.0.0.0 : *                    LISTEN        1723/ java           

[ root @ kafka3 ~ ] #  netstat -nlpt | grep -E "2181|2888|3888"

tcp          0        0 192.168.2.24 : 3888            0.0.0.0 : *                    LISTEN        950 / java            

tcp          0        0 0.0.0.0 : 2181                  0.0.0.0 : *                    LISTEN        950 /java            

tcp          0        0 192.168.2.24 : 2888            0.0.0.0 : *                    LISTEN        950 / java             

#可以看出,如果哪臺是Leader,那麼它就擁有2888這個埠

ok.  這時候zookeeper叢集已經啟動起來了,下面啟動kafka,也是依次按照順序啟動

[ root @ kafka1 ~ ] # nohup /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &   #kafka啟動的命令

[ root @ kafka1 ~ ] #  /usr/local/kafka/bin/kafka-server-stop.sh                                                         #kafka停止的命令

注意,跟zookeeper服務一樣,如果kafka有問題 nohup的日誌檔案會非常大,把磁碟佔滿,這個kafka服務同樣可以通過自己些服務指令碼來管理服務的啟動與關閉。

此時三臺上面的zookeeper及kafka都已經啟動完畢,來檢測以下吧

(1)建立一個主題

[ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic summer

#注意:factor大小不能超過broker數

(2)檢視有哪些主題已經建立

[ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181   #列出叢集中所有的topic

summer    #已經建立成功

(3)檢視summer這個主題的詳情

[ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic summer

Topic : summer PartitionCount : 1 ReplicationFactor : 3 Configs :

Topic : summer Partition : 0 Leader : 2 Replicas : 2 , 4 , 3 Isr : 2 , 4 , 3

#主題名稱:summer

#Partition:只有一個,從0開始

#leader :id為2的broker

#Replicas 副本存在於broker id為2,3,4的上面

#Isr:活躍狀態的broker

(4)傳送訊息,這裡使用的是生產者角色

[ root @ kafka1 ~ ] # /bin/bash /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.22:9092 --topic summer

This is a messages

welcome to kafka     

(5)接收訊息,這裡使用的是消費者角色

[ root @ kafka2 ~ ] # /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper  192.168.2.24:2181 --topic summer --from-beginning

This is a messages

welcome to kafka

如果能夠像上面一樣能夠接收到生產者發過來的訊息,那說明基於kafka的zookeeper叢集就成功啦。

10,下面我們將webserver1上面的logstash的輸出改到kafka上面,將資料寫入到kafka中

(1)修改webserver1上面的logstash配置,如下所示:各個引數可以到 官網 查詢.

root @ webserver1 etc ] # cat logstash.conf

input {              #這裡的輸入還是定義的是從日誌檔案輸入

file {

type = > "system-message"

path = > "/var/log/messages"

start_position = > "beginning"

}

}

output {

#stdout { codec => rubydebug }   #這是標準輸出到終端,可以用於除錯看有沒有輸出,注意輸出的方向可以有多個

kafka {    #輸出到kafka

bootstrap_servers = > "192.168.2.22:9092,192.168.2.23:9092,192.168.2.24:9092"   #他們就是生產者

topic_id = > "system-messages"    #這個將作為主題的名稱,將會自動建立

compression_type = > "snappy"    #壓縮型別

}

}

[ root @ webserver1 etc ] #

(2)配置檢測

[ root @ webserver1 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf --configtest --verbose

Configuration OK

[ root @ webserver1 etc ] #

(2)啟動Logstash,這裡我直接在命令列執行即可

[ root @ webserver1 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf

(3)驗證資料是否寫入到kafka,這裡我們檢查是否生成了一個叫system-messages的主題

[ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.22:2181

summer

system - messages    #可以看到這個主題已經生成了

#再看看這個主題的詳情:

[ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.2.22:2181 --topic system-messages

Topic : system - messages PartitionCount : 16 ReplicationFactor : 1 Configs :

Topic : system - messages Partition : 0 Leader : 2 Replicas : 2 Isr : 2

Topic : system - messages Partition : 1 Leader : 3 Replicas : 3 Isr : 3

Topic : system - messages Partition : 2 Leader : 4 Replicas : 4 Isr : 4

Topic : system - messages Partition : 3 Leader : 2 Replicas : 2 Isr : 2

Topic : system - messages Partition : 4 Leader : 3 Replicas : 3 Isr : 3

Topic : system - messages Partition : 5 Leader : 4 Replicas : 4 Isr : 4

Topic : system - messages Partition : 6 Leader : 2 Replicas : 2 Isr : 2

Topic : system - messages Partition : 7 Leader : 3 Replicas : 3 Isr : 3

Topic : system - messages Partition : 8 Leader : 4 Replicas : 4 Isr : 4

Topic : system - messages Partition : 9 Leader : 2 Replicas : 2 Isr : 2

Topic : system - messages Partition : 10 Leader : 3 Replicas : 3 Isr : 3

Topic : system - messages Partition : 11 Leader : 4 Replicas : 4 Isr : 4

Topic : system - messages Partition : 12 Leader : 2 Replicas : 2 Isr : 2

Topic : system - messages Partition : 13 Leader : 3 Replicas : 3 Isr : 3

Topic : system - messages Partition : 14 Leader : 4 Replicas : 4 Isr : 4

Topic : system - messages Partition : 15 Leader : 2 Replicas : 2 Isr : 2

[ root @ kafka1 ~ ] #

可以看出,這個主題生成了16個分割槽,每個分割槽都有對應自己的Leader,但是我想要有10個分割槽,3個副本如何辦?還是跟我們上面一樣命令列來建立主題就行,當然對於logstash輸出的我們也可以提前先定義主題,然後啟動logstash 直接往定義好的主題寫資料就行啦,命令如下:

[ root @ kafka1 ~ ] # /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 --replication-factor 3 --partitions 10 --topic TOPIC_NAME

好了,我們將logstash收集到的資料寫入到了kafka中了,在實驗過程中我使用while指令碼測試瞭如果不斷的往kafka寫資料的同時停掉兩個節點,資料寫入沒有任何問題。

那如何將資料從kafka中讀取然後給我們的es叢集呢?那下面我們在kafka叢集上安裝Logstash,安裝步驟不再贅述;三臺上面的logstash 的配置如下,作用是將kafka叢集的資料讀取然後轉交給es叢集,這裡為了測試我讓他新建一個索引檔案,注意這裡的輸入日誌還是messages,主題名稱還是“system-messages”

[ root @ kafka1 etc ] # more logstash.conf

input {

kafka {

zk_connect = > "192.168.2.22:2181,192.168.2.23:2181,192.168.2.24:2181"    #消費者們

topic_id = > "system-messages"

codec = > plain

reset_beginning = > false

consumer_threads = > 5

decorate_events = > true

}

}

output {

elasticsearch {

hosts = > [ "192.168.2.18:9200" , "192.168.2.19:9200" ]

index = > "test-system-messages-%{+YYYY-MM}"            #為了區分之前實驗,我這裡新生成的所以名字為“test-system-messages-%{+YYYY-MM}”

}

}

在三臺kafka上面啟動Logstash,注意我這裡是在命令列啟動的;

[ root @ kafka1 etc ] # pwd

/ usr / local / logstash / etc

[ root @ kafka1 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf

[ root @ kafka2 etc ] # pwd

/ usr / local / logstash / etc

[ root @ kafka2 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf

[ root @ kafka3 etc ] # pwd

/ usr / local / logstash / etc

[ root @ kafka3 etc ] # /usr/local/logstash/bin/logstash -f logstash.conf

在webserver1上寫入測試內容,即webserver1上面利用message這個檔案來測試,我先將其清空,然後啟動

[ root @ webserver1 etc ] # >/var/log/messages

[ root @ webserver1 etc ] # echo "我將通過kafka叢集達到es叢集哦^0^" >> /var/log/messages

#啟動logstash,讓其讀取messages中的內容

下圖為我在客戶端寫入到kafka叢集的同時也將其輸入到終端,這裡寫入了三條內容

而下面三張圖側可以看出,三臺Logstash 很平均的從kafka叢集當中讀取出來了日誌內容

再來看看我們的es管理介面

ok ,看到了吧,

流程差不多就是下面 醬紫咯