1. 程式人生 > >Elasticsearch使用Logstash-input-jdbc同步mysql資料(全量和增量)(windows)

Elasticsearch使用Logstash-input-jdbc同步mysql資料(全量和增量)(windows)

專案中用到elasticsearch,初始化資料時時寫的程式從資料庫裡面查詢出來,然後多執行緒往elasticsearch裡面寫入的。

今天試了一下Logstash-input-jdbc外掛,發現高效又方便,而且可以設定定時任務。

1、安裝外掛

logstash的bin目錄下執行命令: logstash-plugin install logstash-input-jdbc

2、配置檔案和jar包在bin目錄下新建一個config-mysql目錄,裡面包含mysql.conf,
在H:\software\logstash-6.1.2\logstash-6.1.2\lib 加入mysql的驅動 mysql-connector-java-5.1.38.jar

mysql.conf的內容如下:


input {
    stdin {
    }
    jdbc {
      # 資料庫
      jdbc_connection_string => "jdbc:mysql://39.107.60.74:3306/db_plat3"
      # 使用者名稱密碼
      jdbc_user => "root"
      jdbc_password => "letsgo123QWE"
      # jar包的位置
      jdbc_driver_library => "H:\software\logstash-6.1.2\logstash-6.1.2\lib\mysql-connector-java-5.1.38.jar"
      # mysql的Driver
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      #statement_filepath => "config-mysql/test02.sql"
      statement => "select * from information"
      schedule => "* * * * *"
      #索引的型別
      type => "information"
    }
}
 
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
 
output {
    elasticsearch {
        hosts => "192.168.1.70:9200"
        # index名
        index => "information"
        # 需要關聯的資料庫中有有一個id欄位,對應索引的id號
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

3、啟動logstash

然後通過以下命令啟動logstash

.\logstash.bat -f  .\config-mysql\mysql.conf


過一會他就會自動的往ES裡新增資料。

4、增量索引

但是現在有一個問題是:往elasticsearch裡面寫入是全量的,需要改成增量。

修改mysql.conf的內容如下

input {
    stdin {
    }
    jdbc {
      # 資料庫
      jdbc_connection_string => "jdbc:mysql://39.107.60.74:3306/db_plat3"
      # 使用者名稱密碼
      jdbc_user => "root"
      jdbc_password => "letsgo123QWE"
      # jar包的位置
      jdbc_driver_library => "H:\software\logstash-6.1.2\logstash-6.1.2\lib\mysql-connector-java-5.1.38.jar"
      # mysql的Driver
      jdbc_driver_class => "com.mysql.jdbc.Driver"
 
 
 
     #使用其它欄位追蹤,而不是用時間
      use_column_value => true
      #追蹤的欄位
      tracking_column => id
      record_last_run => true
     #上一個sql_last_value值的存放檔案路徑, 必須要在檔案中指定欄位的初始值
last_run_metadata_path=>"H:\software\logstash-6.1.2\logstash-6.1.2\bin\config-mysql\station_parameter.txt"


     #開啟分頁查詢
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      statement_filepath => "config-mysql/information.sql"
      #statement => "select * from information where  id > :sql_last_value "
      schedule => "* * * * *"
      #索引的型別
      type => "information"
    }
}
 
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
 
output {
    elasticsearch {
        hosts => "192.168.1.70:9200"
        # index名
        index => "information"
        # 需要關聯的資料庫中有有一個id欄位,對應索引的id號
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

修改的內容見紅色。

station_parameter.txt中的內容如下:

logstash執行時打印出來的sql如下:


其中遇到的一個問題是用:

statement => "select * from information where  id > :sql_last_value "

時會報:sql_last_value 的錯 ,暫時不知道怎麼解決。於是改用檔案config-mysql/information.sql存在sql語句。

ps : 

 上述問題是因為從人家那裡copy過來的時候是錯的  把 :last_sql_value 改成 :sql_last_value  就可以了。

5、用時間來實現增量:

input { stdin { } jdbc { # 資料庫 jdbc_connection_string => "jdbc:mysql://39.107.60.74:3306/db_plat3" # 使用者名稱密碼 jdbc_user => "root" jdbc_password => "letsgo123QWE" # jar包的位置 jdbc_driver_library => "H:\software\logstash-6.1.2\logstash-6.1.2\lib\mysql-connector-java-5.1.38.jar" # mysql的Driver jdbc_driver_class => "com.mysql.jdbc.Driver" #開啟分頁查詢 jdbc_paging_enabled => "true" jdbc_page_size => "50000" #statement_filepath => "config-mysql/information.sql" statement => "select * from information where modify_time > :sql_last_value" schedule => "* * * * *" #索引的型別 type => "information" }}filter { json { source => "message" remove_field => ["message"] }}output { elasticsearch { hosts => "192.168.1.70:9200" # index名 index => "information" # 需要關聯的資料庫中有有一個id欄位,對應索引的id號 document_id => "%{id}" } stdout { codec => json_lines }}modify_time 是我表裡的最後修改時間欄位。
而 :sql_last_value如果input裡面use_column_value => true, 即如果設定為true的話,可以是我們設定的欄位的上一次的值。 
預設 use_column_value => false, 這樣 :sql_last_value為上一次更新的最後時刻值。 
也就是說,對於新增的值,才會更新。這樣就實現了增量更新的目的。


參考文章:https://www.cnblogs.com/a-du/p/7611620.html