1. 程式人生 > >使用Logstash來實時同步MySQL和log日誌資料到ES

使用Logstash來實時同步MySQL和log日誌資料到ES

少年,光看是不行的,我的github在這裡,跟著做吧:https://github.com/singgel/NoSql-SkillTree

        logstash是一個數據分析軟體,主要目的是分析log日誌。整一套軟體可以當作一個MVC模型,logstash是controller層,Elasticsearch是一個model層,kibana是view層。

      首先將資料傳給logstash,它將資料進行過濾和格式化(轉成JSON格式),然後傳給Elasticsearch進行儲存、建搜尋的索引,kibana提供前端的頁面再進行搜尋和圖表視覺化,它是呼叫Elasticsearch的介面返回的資料進行視覺化。logstash和Elasticsearch是用Java寫的,kibana使用node.js框架。

以下以mysql為例,log同理

一、首先下載和你的ES對應的logstash版本,本篇我們使用的都是6.3.1

       下載後使用logstash-plugin install logstash-input-jdbc 命令安裝jdbc的資料連線外掛

 

二、新增mysqltoes.conf檔案,配置Input和output引數如下,連線jdbc按照規則同步指定的資料到es

       大家注意這裡的配置有很多種用法,包括同步時間規則和最後更新時間的用法就不詳細展開了

input {
 stdin { }
    jdbc {
        jdbc_connection_string => "jdbc:mysql://localhost:3306/star"
  
        jdbc_user => "root"
 
        jdbc_password => "123456"
 
        jdbc_driver_library => "/Users/yiche/.m2/repository/mysql/mysql-connector-java/8.0.12/mysql-connector-java-8.0.12.jar"
 
        jdbc_driver_class => "com.mysql.jdbc.Driver"
         
        jdbc_paging_enabled => "true"
         
        jdbc_page_size => "50000"
        statement => "SELECT * FROM channel_data_detail_month"
        schedule => "* * * * *"
    }
 }
  
 output {
     stdout {
        codec => json_lines
    }
    elasticsearch {
        hosts => "localhost:9200"
        index => "star"
        document_type => "channel_data_detail_month"
        document_id => "%{id}"
    }
}

使用logstash按照conf檔案執行 ./bin/logstash.bat -f ./config/mysqltoes.conf

注意這裡可能有執行不成功的坑,主要是把配置設定好,還有檔案和名稱編碼的問題  output es的配置用hosts

這時候我們可以看到MYSQL中的表資料已成功匯入ES

---

log日誌的

1. 定義資料來源

      寫一個配置檔案,可命名為logstash.conf,輸入以下內容:

input {
        file {
                path => "/data/web/logstash/logFile/*/*"
                start_position => "beginning" #從檔案開始處讀寫
        }
#       stdin {}  #可以從標準輸入讀資料
}

 

      定義的資料來源,支援從檔案、stdin、kafka、twitter等來源,甚至可以自己寫一個input plugin。如果像上面那樣用萬用字元寫file,如果有新日誌檔案拷進來,它會自動去掃描。

 

2. 定義資料的格式

      根據打日誌的格式,用正則表示式進行匹配

filter {

  #定義資料的格式
  grok {
    match => { "message" => "%{DATA:timestamp}\|%{IP:serverIp}\|%{IP:clientIp}\|%{DATA:logSource}\|%{DATA:userId}\|%{DATA:reqUrl}\|%{DATA:reqUri}\|%{DATA:refer}\|%{DATA:device}\|%{DATA:textDuring}\|%{DATA:duringTime:int}\|\|"}
  }

}

 

      由於打日誌的格式是這樣的:

2015-05-07-16:03:04|10.4.29.158|120.131.74.116|WEB|11299073|http://quxue.renren.com/shareApp?isappinstalled=0&userId=11299073&from=groupmessage|/shareApp|null|Mozilla/5.0 (iPhone; CPU iPhone OS 8_2 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) Mobile/12D508 MicroMessenger/6.1.5 NetType/WIFI|duringTime|98||

      以|符號隔開,第一個是訪問時間,timestamp,作為logstash的時間戳,接下來的依次為:服務端IP,客戶端的IP,機器型別(WEB/APP/ADMIN),使用者的ID(沒有用0表示),請求的完整網址,請求的控制器路徑,reference,裝置的資訊,duringTime,請求所花的時間。

     如上面程式碼,依次定義欄位,用一個正則表示式進行匹配,DATA是logstash定義好的正則,其實就是(.*?),並且定義欄位名。

     我們將訪問時間作為logstash的時間戳,有了這個,我們就可以以時間為區分,檢視分析某段時間的請求是怎樣的,如果沒有匹配到這個時間的話,logstash將以當前時間作為該條記錄的時間戳。需要再filter裡面定義時間戳的格式,即打日誌用的格式:

 

filter {

  #定義資料的格式
  grok {#同上... }

  #定義時間戳的格式
  date {
    match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
    locale => "cn"
  }

}

 

     在上面的欄位裡面需要跟logstash指出哪個是客戶端IP,logstash會自動去抓取該IP的相關位置資訊:

filter {

  #定義資料的格式
  grok {#同上}

  #定義時間戳的格式
  date {#同上}

  #定義客戶端的IP是哪個欄位(上面定義的資料格式)
  geoip {
    source => "clientIp"
  }
}

 

 

     同樣地還有客戶端的UA,由於UA的格式比較多,logstash也會自動去分析,提取作業系統等相關資訊

  #定義客戶端裝置是哪一個欄位
  useragent {
    source => "device"
    target => "userDevice"
  }

      哪些欄位是整型的,也需要告訴logstash,為了後面分析時可進行排序,使用的資料裡面只有一個時間

  #需要進行轉換的欄位,這裡是將訪問的時間轉成int,再傳給Elasticsearch
  mutate {
    convert => ["duringTime", "integer"]
  }

3. 輸出配置

      最後就是輸出的配置,將過濾扣的資料輸出到elasticsearch

output {
  #將輸出儲存到elasticsearch,如果沒有匹配到時間就不儲存,因為日誌裡的網址引數有些帶有換行
  if [timestamp] =~ /^\d{4}-\d{2}-\d{2}/ {
        elasticsearch { host => localhost }
  }

   #輸出到stdout
#  stdout { codec => rubydebug }

   #定義訪問資料的使用者名稱和密碼
#  user => webService
#  password => 1q2w3e4r
}

 

 

      我們將上述配置,儲存到logstash.conf,然後執行logstash

      在logstash啟動完成之後,輸入上面的那條訪問記錄,logstash將輸出過濾後的資料:

     可以看到logstash,自動去查詢IP的歸屬地,並將請求裡面的device欄位進行分析。