1. 程式人生 > >ELK+MySQL出現大量重復記錄問題處理

ELK+MySQL出現大量重復記錄問題處理

處理 cor oot crontab last log 好的 不可 blank

一、使用Logstash使用jdbc從MySQL讀取數據操作

1.1 安裝jdbc插件

jdbc默認已安裝,如果沒安裝使用logstash-plugin安裝即可(logstash-plugin在logstash的bin目錄下):

logstash-plugin install logstash-input-jdbc

1.2 下載mysql jdbc驅動

mysql最新版是8.0,但使用時報錯,這裏仍以5.1版本演示

wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.47.zip
# 不推薦放/tmp目錄,但這裏為了與用戶無關所以直接解壓到/tmp目錄
unzip mysql-connector-java-5.1.47.zip -d /tmp

1.3 配置logstash文件

配置如下項,如無意外mysql中數據即可同步到elasticsearch

input{
  jdbc {
    # jdbc:mysql://數據庫ip:端口/數據庫名
    jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb"
    # 數據庫用戶名      
    jdbc_user => "root"
    # 數據庫密碼
    jdbc_password => "root
" # mysql java驅動文件位置 jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar" # 驅動類名,這個一般不需要改 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 與crontab類似的格式,* * * * *基本隔十來秒就同步一次 schedule
=> "* * * * *" # 同步時要執行的sql語句,語句最終的結果集同步到elasticsearch; 我這裏只是簡單將表中所有記錄查出 # 如果語句很多,可使用statement_file來指定sql語句文件 statement => "select * from exploit_records" } } filter { } output{ elasticsearch { hosts => ["localhost:9200"] } }

二、記錄重復問題

2.1 問題描述

elasticsearch以_id項作為主鍵,只要傳過來的數據中_id項的值尚未存在elasticsearch會接收該數據。

而_id項的值默認其實是數據到來時elasticsearch自己生成的然後添加上去的,所以在這種情況下_id是不可能重復的,即所有數據都會被來者不拒地接收。

logstash jdbc插件對數據庫的數據的操作是依照計劃(schedule)執行sql語句(statement),所有查出來的數據就統統按output發過去。

在這種模式下,每執行一次計劃(schedule)所有記錄就會被發往elasticsearch一次,而_id是由elasticsearch生成的不會重復,所以不同次計劃(schedule)的同一條記錄會被elasticsearch反復收錄。

2.2 問題處理辦法

同一條記錄被反復收錄這種情況一般都不是我們想要的,而要消除這種情況其關鍵是要將_id變成不是由elasticsearch自動生成,更確切地講是要將_id變成人為可控地生成。

這種配置也不難,elasticsearch允計通過使用document_id來配置_id。

假設我們表的主鍵是edb_id,為了避免重復,我們要以表主鍵值賦值給document_id即可。此時1.3中配置修改如下:

input{
  jdbc {
    # jdbc:mysql://數據庫ip:端口/數據庫名
    jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb"
    # 數據庫用戶名      
    jdbc_user => "root"
    # 數據庫密碼
    jdbc_password => "root"
    # mysql java驅動文件位置
    jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar"
    # 驅動類名,這個一般不需要改
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    # 與crontab類似的格式,* * * * *基本隔十來秒就同步一次
    schedule => "* * * * *"
    # 同步時要執行的sql語句,語句最終的結果集同步到elasticsearch; 我這裏只是簡單將表中所有記錄查出
    # 如果語句很多,可使用statement_file來指定sql語句文件
    statement => "select * from exploit_records"
  }
}

filter {

}

output{
  elasticsearch {
    hosts => ["localhost:9200"]
    # 相比1.3只多加了這一句
    # 將表主鍵edb_id字段的值賦值給elasticsearch的主鍵
    document_id => "%{edb_id}"
  }
}

2.3 多路徑輸入問題

由於我們這裏只查詢了一張表,所以2.2中的配置沒有什麽問題。但只果此時要多輸入一張表這張表沒有edb_id字段,或者要多輸入的根本就不是數據庫的表不會有edb_id項,那麽按document_id => "%{edb_id}"取到的edb_id會為空,這些數據因_id項值都為空而被視為重復項不為elasticsearch所收錄。

要解決這個問題,一種思路是在filter中進行處理,確保每條數據都生成edb_id項。但要給數據加上自己本來就不需要的項這種操作怎麽看都不像是最優解。

另外一種思路就是在output中進行限制操作,只有當前這張表才執行document_id => "%{edb_id}"。這是能想到的比較好的辦法,此時配置修改如下:

input{
  jdbc {
    # jdbc:mysql://數據庫ip:端口/數據庫名
    jdbc_connection_string => "jdbc:mysql://10.10.6.91:3306/expdb"
    # 數據庫用戶名      
    jdbc_user => "root"
    # 數據庫密碼
    jdbc_password => "root"
    # mysql java驅動文件位置
    jdbc_driver_library => "/tmp/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar"
    # 驅動類名,這個一般不需要改
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    # 與crontab類似的格式,* * * * *基本隔十來秒就同步一次
    schedule => "* * * * *"
    # 同步時要執行的sql語句,語句最終的結果集同步到elasticsearch; 我這裏只是簡單將表中所有記錄查出
    # 如果語句很多,可使用statement_file來指定sql語句文件
    statement => "select * from exploit_records"
  }
  
  # 多讀取一個文件
  file {
    # 要讀取的文件的位置
    path => ["/home/ls/test.txt"]
    start_position => "beginning"
  }
}

filter {

}

output{
  # 對於ebd_id項不為空的記錄使用此項
  if [edb_id] != "" {
    elasticsearch {
      hosts => ["localhost:9200"]
      document_id => "%{edb_id}"
    }
  }
  # 對於path匹配test.txt的記錄使用此項
  if [path] =~ "test.txt"{
    elasticsearch {
      hosts => ["localhost:9200"]
    }
  }
}

參考:

https://segmentfault.com/a/1190000014387486

ELK+MySQL出現大量重復記錄問題處理