1. 程式人生 > >Elasticsearch使用Logstash-input-jdbc同步mysql資料(全量和增量)

Elasticsearch使用Logstash-input-jdbc同步mysql資料(全量和增量)


作者:camelcanoe
來源:CSDN
原文:https://blog.csdn.net/camelcanoe/article/details/79759376
版權宣告:本文為博主原創文章,轉載請附上博文連結!

專案中用到elasticsearch,初始化資料時時寫的程式從資料庫裡面查詢出來,然後多執行緒往elasticsearch裡面寫入的。

今天試了一下Logstash-input-jdbc外掛,發現高效又方便,而且可以設定定時任務。

1、安裝外掛

在logstash的bin目錄下執行命令: logstash-plugin install logstash-input-jdbc

2、配置檔案和jar包
在bin目錄下新建一個config-mysql目錄,裡面包含mysql.conf,
在H:\software\logstash-6.1.2\logstash-6.1.2\lib 加入mysql的驅動 mysql-connector-java-5.1.38.jar

mysql.conf的內容如下:

input {
stdin {
}
jdbc {
# 資料庫
jdbc_connection_string => “jdbc:mysql://39.107.60.74:3306/db_plat3”
# 使用者名稱密碼
jdbc_user => “root”
jdbc_password => “letsgo123QWE”
# jar包的位置
jdbc_driver_library => “H:\software\logstash-6.1.2\logstash-6.1.2\lib\mysql-connector-java-5.1.38.jar”
# mysql的Driver
jdbc_driver_class => “com.mysql.jdbc.Driver”
jdbc_paging_enabled => “true”
jdbc_page_size => “50000”
#statement_filepath => “config-mysql/test02.sql”
statement => “select * from information”
schedule => “* * * * *”
#索引的型別
type => “information”
}
}

filter {
json {
source => “message”
remove_field => [“message”]
}
}

output {
elasticsearch {
hosts => “192.168.1.70:9200”
# index名
index => “information”
# 需要關聯的資料庫中有有一個id欄位,對應索引的id號
document_id => “%{id}”
}
stdout {
codec => json_lines
}
}

3、啟動logstash

然後通過以下命令啟動logstash

.\logstash.bat -f .\config-mysql\mysql.conf

過一會他就會自動的往ES裡新增資料。

4、增量索引

但是現在有一個問題是:往elasticsearch裡面寫入是全量的,需要改成增量。

修改mysql.conf的內容如下

input {
stdin {
}
jdbc {
# 資料庫
jdbc_connection_string => “jdbc:mysql://39.107.60.74:3306/db_plat3”
# 使用者名稱密碼
jdbc_user => “root”
jdbc_password => “letsgo123QWE”
# jar包的位置
jdbc_driver_library => “H:\software\logstash-6.1.2\logstash-6.1.2\lib\mysql-connector-java-5.1.38.jar”
# mysql的Driver
jdbc_driver_class => “com.mysql.jdbc.Driver”

 #使用其它欄位追蹤,而不是用時間
  use_column_value => true
  #追蹤的欄位
  tracking_column => id
  record_last_run => true
 #上一個sql_last_value值的存放檔案路徑, 必須要在檔案中指定欄位的初始值

last_run_metadata_path=>“H:\software\logstash-6.1.2\logstash-6.1.2\bin\config-mysql\station_parameter.txt”
#開啟分頁查詢
jdbc_paging_enabled => “true”
jdbc_page_size => “50000”
statement_filepath => “config-mysql/information.sql”
#statement => "select * from information where id > :sql_last_value "
schedule => “* * * * *”
#索引的型別
type => “information”
}
}

filter {
json {
source => “message”
remove_field => [“message”]
}
}

output {
elasticsearch {
hosts => “192.168.1.70:9200”
# index名
index => “information”
# 需要關聯的資料庫中有有一個id欄位,對應索引的id號
document_id => “%{id}”
}
stdout {
codec => json_lines
}
}

修改的內容見紅色。

station_parameter.txt中的內容如下:

logstash執行時打印出來的sql如下:

其中遇到的一個問題是用:

statement => "select * from information where id > :sql_last_value "

時會報:sql_last_value 的錯 ,暫時不知道怎麼解決。於是改用檔案config-mysql/information.sql存在sql語句。

ps :

上述問題是因為從人家那裡copy過來的時候是錯的 把 :last_sql_value 改成 :sql_last_value 就可以了。

5、用時間來實現增量:

input {
stdin {
}
jdbc {

資料庫

jdbc_connection_string => “jdbc:mysql://39.107.60.74:3306/db_plat3”

使用者名稱密碼

jdbc_user => “root”
jdbc_password => “letsgo123QWE”

jar包的位置

jdbc_driver_library => “H:\software\logstash-6.1.2\logstash-6.1.2\lib\mysql-connector-java-5.1.38.jar”

mysql的Driver

jdbc_driver_class => “com.mysql.jdbc.Driver”
#開啟分頁查詢
jdbc_paging_enabled => “true”
jdbc_page_size => “50000”
#statement_filepath => “config-mysql/information.sql”
statement => “select * from information where modify_time > :sql_last_value”
schedule => “* * * * *”
#索引的型別
type => “information”
}
}
filter {
json {
source => “message”
remove_field => [“message”]
}
}
output {
elasticsearch {
hosts => “192.168.1.70:9200”

index名

index => “information”

需要關聯的資料庫中有有一個id欄位,對應索引的id號

document_id => “%{id}”
}
stdout {
codec => json_lines
}
}
modify_time 是我表裡的最後修改時間欄位。
而 :sql_last_value如果input裡面use_column_value => true, 即如果設定為true的話,可以是我們設定的欄位的上一次的值。
預設 use_column_value => false, 這樣 :sql_last_value為上一次更新的最後時刻值。
也就是說,對於新增的值,才會更新。這樣就實現了增量更新的目的。