1. 程式人生 > >利用ELK+Kafka解決方案,搭建企業級實時日誌分析平臺

利用ELK+Kafka解決方案,搭建企業級實時日誌分析平臺

    ELK是三款軟體的組合。是一整套完整的解決方案。分別是由Logstash(收集+分析)、ElasticSearch(搜尋+儲存)、Kibana(視覺化展示)三款軟體。ELK主要是為了在海量的日誌系統裡面實現分散式日誌資料集中式管理和查詢,便於監控以及排查故障。

目錄

Kafka

  Elasticsearch

    ElasticSearch 是一個基於 Lucene 的搜尋伺服器。它提供了一個分散式多使用者能力的全文搜尋引擎,基於 RESTful API 的 web 介面。Elasticsearch是用Java開發的,並作為Apache許可條款下的開放原始碼釋出,是當前流行的企業級搜尋引擎。設計用於雲端計算中,能夠達到實時搜尋,穩定,可靠,快速,安裝使用方便。

    安裝


[-> ~# opt] wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.4.2.rpm    # 官網下載軟體包
[-> ~# opt] yum -y install java-1.8.0     # 相關依賴包
[-> ~# opt] yum -y install epel-release
[-> ~# opt] yum localinstall -y elasticsearch-6.4.2.rpm    

     安裝完成之後,我們修改ElasticSearch的配置檔案

,然後啟動ElasticSearch服務

[-> ~#] vim /etc/elasticsearch/elasticsearch.yml
        # 將如下的內容的註釋取消,並將相關資訊改成你在配置時的實際引數
        ***注意:配置檔案的引數,在冒號後都有一個空格,如沒有空格,服務會啟動失敗,並且無任何提示
            
          cluster.name: ES-cluster   # 叢集名稱,可更改
          node.name: es1             # 為本機的主機名
          network.host: 192.168.4.1  # 為本機的IP
          http.port: 9200            # elasticsearch服務埠,可更改
          discovery.zen.ping.unicast.hosts: ["es1", "es2","es3"]    # 叢集內的主機(主機名,不一定全部都要寫)
          node.master: true                                # 節點是否被選舉為master
          node.data: true                                  # 節點是否儲存資料
          index.number_of_shards: 4                        # 索引分片的個數
          index.number_of_replicas: 1                      # 分片的副本個數
          path.conf: /etc/elasticsearch/                   # 配置檔案的路徑
          path.data: /data/es/data                         # 資料目錄路徑
          path.work: /data/es/elasticsearch                # 工作目錄路徑
          path.logs:  /data/es/logs/elasticsearch/logs/       # 日誌檔案路徑
          path.plugins:  /data/es/plugins                  # 外掛路徑
          bootstrap.mlockall: true                         # 記憶體不向swap交換
          http.enabled: true                               # 啟用http

         # 如果目錄路徑不存在的,需要先建立        

[-> ~#] /etc/init.d/elasticsearch start    # 啟動服務
[-> ~#] systemctl enable elasticsearch     # 開機自啟

    一個簡單的ElasticSearch就搭建好了,如果我們要部署一個ES叢集,那麼我們只需要在所有主機上部署好Java環境,以及在所有主機上的/etc/hosts解析主機,如下:

    # 此操作是在ES叢集主機上必做

[-> ~#] yum -y install java-1.8.0
[-> ~#] vim /etc/hosts     
        192.168.4.1 es1
        192.168.4.2 es2
        192.168.4.3 es3

# 將已經完成elasticsearch服務安裝的主機,將、hosts檔案、elasticsearch.yml配置檔案拷貝一份到叢集主機上,只要修改node.name: 為本機的主機名即可,然後將服務啟動起來

    最後我們來檢視下Es叢集是否部署好,不成功則number_of_nodes永遠為1,成功會有下圖資訊,:

[-> ~#] curl -i http://192.168.4.1:9200/_cluster/health?pretty
# 返回的資訊包括叢集名稱、叢集數量等,如果number_of_nodes顯示的是實際得叢集數量,則說明叢集部署成功
{
  "cluster_name" : "ES-cluster",
  "status" : "green",
  "timed_out" : false,
  "number_of_nodes" : 3,
  "number_of_data_nodes" : 3,
  "active_primary_shards" : 26,
  "active_shards" : 52,
  "relocating_shards" : 0,
  "initializing_shards" : 0,
  "unassigned_shards" : 0,
  "delayed_unassigned_shards" : 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch" : 0,
  "task_max_waiting_in_queue_millis" : 0,
  "active_shards_percent_as_number" : 100.0
}

ElasticSearch外掛安裝


# 採用本地安裝,也可以採用遠端安裝
[-> ~#] /usr/share/elasticsearch/bin/plugin install file:///data/es/plugins/elasticsearch-head-master.zip    
[-> ~#] /usr/share/elasticsearch/bin/plugin install file:///data/es/plugins/elasticsearch-kopf-master.zip
[-> ~#] /usr/share/elasticsearch/bin/plugin install file:///data/es/plugins/bigdesk-master.zip
[-> ~#] /usr/share/elasticsearch/bin/plugin list     # 檢視已安裝的外掛 

外掛安裝完成之後,訪問對應外掛的Url,如果訪問出現如下圖,則成功:

head外掛展示圖(http://192.168.4.1:9200/_plugin/head

kopf外掛展示圖(http://192.168.4.1:9200/_plugin/kopf

Bigdesk展示圖(http://192.168.4.1:9200/_plugin/bigdesk

Logstash

   logstash是一個數據採集、加工處理以及傳輸的工具,能夠將所有的資料集中處理,使不同模式和不同資料的正常化,而且資料來源能夠隨意輕鬆的新增。

      安裝logstash軟體包

[-> ~#] wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.2.rpm

      安裝完成編寫logstash.con日誌收集檔案測試:

[-> ~#] vim /etc/logstash/logstash.conf
    # 編寫簡單的收集日誌檔案

    input{
    stdin{}
}

output{
    elasticsearch{
      hosts => ["192.168.4.1:9200","192.168.4.2:9200","192.168.4.3:9200"]
    }
}

     編寫完後,使用命令檢視下logstash.conf是否無誤 “ /opt/logstash/bin/logstash -f /etc/logstash/logstash.conf --configtest --verbose  ”,需要注意的是,yum安裝的Logstash預設在/opt目錄下。原始碼安裝可以自行指定。

[-> ~#] /opt/logstash/bin/logstash -f /etc/logstash/logstash.conf --configtest --verbose

    # 如果出現 Configuration OK字樣,則說明檔案準確無誤

    在確認logstash.conf檔案無誤後,我們利用logstash.conf裡面的語句進行資料的收集,然後在螢幕上輸入你想測試的資料。

詳細將下圖:

     在輸入後,我們再回到Es檢視是否有剛剛在螢幕上輸入的資料。

    下圖我們可以看到,資料已經分別寫入到了剛剛搭建好的Es叢集中。

    此外,我們再來看看剛剛收集的資料的詳細資訊:

  通過簡單測試,可以得出我們的Es叢集是處於健康狀態並且可以收集到資料的。那麼自動的收集日誌資料,等我們搭建完Kafka時一同實現。

Kafka

Apache kafka是訊息中介軟體的一種,是一種分散式的,基於釋出/訂閱的訊息系統。能實現一個為處理實時資料提供一個統一、高吞吐、低延遲的平臺,且擁有分散式的,可劃分的,冗餘備份的永續性的日誌服務等特點。

    將下載的kafka tar包解壓到指定的位置,並修改zookeeper叢集檔案以及kafka的server檔案,如下:

[-> ~#] wget http://mirror.bit.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
[-> ~#] tar -xvf kafka_2.11-2.0.0.tgz -C /opt/
[-> ~#] cd /opt/kafka_2.11-2.0.0/

# zookeeper叢集
[-> ~# kafka_2.11-2.0.0] vim config/zookeeper.properties
    dataDir=/data/zookeeper            # 指定zookeeper的目錄,如果不存在則需要建立
    tickTime=2000                      # 客戶端與伺服器之間維持心跳的時間間隔
    initLimit=20    
    syncLimit=10
    server.1=192.168.4.1:2888:3888     # 2888表示的是zookeeper叢集中Leader的埠
    server.2=192.168.4.2:2888:3888     # 3888表示的是如果Leader宕掉了,那麼從3888埠中重新選舉新的Leader
    server.3=192.168.4.3:2888:3888      

# kafka服務配置
[-> ~# kafka_2.11-2.0.0] vim config/server.properties
        broker.id=0        # 唯一的
        port=9092
        host.name=192.168.4.1
        log.dirs=/data/kafka-logs    # 注意這裡的是kafka的儲存目錄。並不是存放日誌的目錄,當你啟動kafka的時候,會自動在當前目錄下生成一個log專門存放kafka的日誌檔案。如果沒有需建立
        zookeeper.connect=192.168.4.1:2181,192.168.4.2:2181,192.168.4.3:2181    #這裡可以是主機名,也可以是ip,但是填寫的是主機名,就必須在/etc/hosts檔案下進行解析才可以生效

    上述的操作是單機的kafka,如果我們需要搭建kafka叢集,那麼將修改好的kafka整個目錄scp到叢集伺服器上即可。

    另外需要注意的是:修改每臺主機配置檔案下的個別引數,我們主要修改如下操作,以下操作均在server.properties上操作:

broker.id = x     # x:是任意數,只要不重複即可
host.name=xxx.xxx.xxx.xxx     這裡填寫的應為本機的IP地址

# 建立相關資料夾以及檔案,如在建立了/data/zookeeper後,下面應該還得再執行
    echo 1 > /data/zookeeper/myid    每臺機器操作的數字也不能一樣

    當我們在叢集主機上都修改好配置檔案引數後,隨後就是將zookeeper叢集啟動,以及將kafka服務啟動起來。如下:

# 後臺啟動zookeeper,在叢集主機上進行操作即可

[-> ~# ] /opt/kafka_2.11-2.0.0/bin/zookeeper-server-start.sh -daemon /opt/kafka_2.11-2.0.0/config/zookeeper.properties    

# 後臺啟動kafka服務,在叢集主機上進行操作即可
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.11-2.0.0/config/server.properties

# 檢視2888、3888、2181、9092埠是否都已經啟動,如果2888、3888埠不存在,那麼在/opt/kafka_2.11-2.0.0/log/下檢視zookeeper.out以及server.log報錯資訊

[-> ~# ] netstat -antulp | grep -E '2181|2888|3888|9092'

    配置了zookeeper叢集以及kafka叢集,啟動了服務如下圖為成功,缺一則為失敗:

然後,我們來測試下,kafka是否可用:

新增一個topic,然後scon為他分配一個分割槽,設定副本
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic scon

    # 這裡需要非常注意,就是factor數不能超過broker數。broker數就是叢集機器的總數。如,編者的叢集機器數為3臺,那麼factor的數就不能大於3,只能<=3.

# 檢視新建立的主題scon
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic scon

# 修改建立的scon主題,這裡將副本數5,修改為6
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 6 --topic scon

# 刪除建立的scon主題
[-> ~# ] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic scon

#注意:是否開啟topic的刪除功能:預設為false
    delete.topic.enable 在conf/server.properties中delete.topic.enable=true

   測試效果如下圖:

    那麼,Kafka的叢集環境已經搭建完成,那麼我們需要測試下,logstash是否能夠將收集到的日誌資料傳遞到kafka上,最後經過kafka, 再將資料傳到我們的Es叢集上。

    這裡或許就有疑問了,Redis同樣可以,為什麼要選擇kafka?

    我們都知道Redis是以key的hash方式來分散對列儲存資料的,且Redis作為叢集使用時,對應的應用對應一個Redis,在某種程度上會造成資料的傾斜性,從而導致資料的丟失。

    而從之前我們部署Kafka叢集來看,kafka的一個topic(主題),可以有多個partition(副本),而且是均勻的分佈在Kafka叢集上,這就不會出現redis那樣的資料傾斜性。Kafka同時也具備Redis的冗餘機制,像Redis叢集如果有一臺機器宕掉是很有可能造成資料丟失,而Kafka因為是均勻的分佈在叢集主機上,即使宕掉一臺機器,是不會影響使用。同時Kafka作為一個訂閱訊息系統,還具備每秒百萬級別的高吞吐量,永續性的、分散式的特點等。

    接下來我們將編寫logstash.conf檔案,將收集日誌檔案併到kafka。如下:

[-> #~] vim /etc/logstash/logstash.conf

input{
  file {
    type => "weblog"            # 定義日誌檔案型別
    path => "/opt/access.log"    # 日誌檔案路徑,這裡收集web的日誌檔案
    start_position => "beginning"     # 從第一行開始讀取資料
  }
}

output {
    kafka {
      bootstrap_servers => "192.168.4.1:9092,192.168.4.2:9092,192.168.4.3:9092"    # kafka叢集主機
      topic_id => "web-log"     # 主題名稱,會自動建立
      compression_type => "snappy"     # 壓縮方式
    }
}


# 檢視kafka,logstash是否成功將資料傳入到kafka上

[-> #~] /opt/kafka_2.11-2.0.0/bin/kafka-topics.sh --list --zookeeper localhost:2181 

    如出現如下圖,則資料寫入成功:


    這裡可能就會有疑問了,收集時的分割槽數,副本數都是系統預設生成的,那麼我該如何實現指定分割槽數,副本數?我們可以在收集時現行建立一個主題,然後再收集的時候就使用這個主題即可。如下:

[-> #~]  /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.22:2181 
--replication-factor 3 --partitions 7 --topic Topic_Name    # 3個副本,7個分割槽

    好了,logstash將資料寫入到kafka已經是沒有問題,那麼接下來就是:如何將kafka上的資料資訊寫入到我們部署好的Es叢集上。當然,linux可是" 一切皆檔案 "。所以,我們還是修改logstash.conf的配置檔案。

    之所以使用kafka來進行資料儲存,是為了減少Es叢集前端的壓力,所以加上了訊息佇列Kafka作為一個過渡。

編寫配置檔案,如下:

[-> #~] vim /etc/logstash/logstash.conf

input {
   kafka {
     zk_connect => "192.168.4.1:2181,192.168.4.2:2181,192.168.4.3:2181"
     topic_id => "web-log"
     reset_beginning => false
     consumer_threads => 5
     decorate_events => true
    }
}

output {
    elasticsearch {
       hosts => ["192.168.4.2:9200","192.168.4.3:9200"]
       index => "sc-%{type}-%{+YYYY.MM.dd}"
       workers => 1
       flush_size => 20000
        idle_flush_time => 10
        template_overwrite => true
    }
}

    資料成功寫入到Es叢集,如下圖所示:

   另外我們來看看詳細的json資料資訊:

Kibana

    Kibana是一個開源的分析與視覺化平臺,可以使用各種不同的圖表、表格、地圖等kibana能夠很輕易地展示高階資料分析與視覺化。

    安裝並啟動Kibana:

[-> #~ ] wget https://artifacts.elastic.co/downloads/kibana/kibana-6.4.2-x86_64.rpm
[-> #~ ] yum -y localinstall kibana-6.4.2-x86_64.rpm

    # 在啟動Kibana之前,我們先修改kibana的配置檔案,修改完成後再啟動kibana服務
[-> #~ ] vim /opt/kibana/config/kibana.yml
    server.port: 5601         # kibana的埠,切記一定不能修改埠,此埠是被寫死在檔案內的
    server.host: "0.0.0.0"    #kibana的服務地址,可以是本地IP地址
    elasticsearch.url: "http://192.168.1.11:9200"   "    #ES的地址
    kibana.index: ".kibana"        #kibana的索引名稱
    kibana.defaultAppId: "discover"
    elasticsearch.pingTimeout: 1500        #連通ES的超時時間
    elasticsearch.requestTimeoumeout: 30000    #ES請求超時時間
    elasticsearch.startupTimeoumeout: 5000

[-> #~ ] systemctl start kibana    # 啟動kibana服務
[-> #~ ] systemctl enable kiban    # 開機自啟動

   如果出現瞭如下圖,則說明kibana服務跟Es叢集已連通:

    接下來我們來訪問kibana “ http://192.168.4.1:5601 ”,如下圖:

    既然,Kibana已經搭建成功,那麼接下來就是如何的使用Kibana,我們都知道Kibana是一個視覺化的工具,它能製作不同的儀表盤以直觀的方式展示。下面我們就來看看kibana是如何使用的。

    一、填寫需要製作的index名稱:

    二、選擇時間段檢視資料:

    三、選擇圖形型別製作儀表盤

    四、製作儀表盤

五、最後檢視儲存的儀表盤

    相對比雜亂無章的日誌資料,以儀表盤的形式顯現是不是更加清晰明瞭,且直擊重點。在現如今每年全球的資料都會呈2倍的趨勢增長的大資料時代。能夠在海量的資料中能夠實時且快速找到想要的重點,並且還能實時進行分散式的監控。或許這才是最為被大眾所接受的。

  至此,ELK+Kafka架構已經搭建完成!

 本文旨在提供參考,如有錯誤,歡迎大家指正。幫助編者不斷的改進!