logstash-input-jdbc實現mysql 與elasticsearch實時同步
實現MySQL資料庫中資料到Elasticsearch的實時同步:
首先需要做好的準備工作:
1、伺服器上安裝好elasticsearch和logstash
2、安裝logstash-input-jdbc外掛,但從logstash5.X開始,已經至少集成了logstash-input-jdbc外掛。所以,你如果使用的是logstash5.X,可以不必再安裝,可以直接跳過這一步。
外掛安裝可參考:http://blog.csdn.net/yeyuma/article/details/50240595#quote
注:在以上配置映象的過程中taobao的映象地址已經改為如下地址:http://gems.ruby-china.com/ added to sources
如何同步?
1、建立資料庫和表
資料庫:test
表:item
CREATE TABLE `item` (
`item_id` int(11) NOT NULL,
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
`price` decimal(10,2) DEFAULT NULL,
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`item_id`),
KEY `user` (`id`),
CONSTRAINT `user` FOREIGN KEY (`id`) REFERENCES `user` (`id`) ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
2、準備一個es-mysql.conf檔案
配置如下:
input {
stdin {
}
jdbc {
jdbc_connection_string => "jdbc:mysql://node01:3306/test"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/home/mysql-connector-java-5.1.28-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#下面兩個引數是增量匯入和更新的關鍵
#use_column_value設定為true時,使用tracking_column定義的值作為sql_last_value,預設為false
#use_column_value => true
# sql_last_value用於計算要查詢的行的值。在執行任何查詢之前,將其設定為1970年1月1日星期四,如果 use_column_value為true,則tracking_column則設定為0。在後續查詢執行後,它會相應更新。
#tracking_column => "create_time"
#兩種sql查詢方法,一種直接書寫在conf中,另一種從配置檔案中讀取
#statement_filepath => "/bigdata/logstash-2.3.1/conf/es-jdbc/es-mysql.sql"
statement => "select * from item where create_time>=:sql_last_value"
#定時欄位 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新
schedule => "* * * * *"
type => "jdbc"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
output {
elasticsearch {
#ES的地址與埠
hosts => ["node01:9200", "node02:9200", "node03:9200"]
# ES的索引,自己定義
index => "mysql-es"
# ES的唯一id標識
document_id => "%{item_id}"
codec => plain {
charset => "UTF-16BE"
}
}
stdout{
codec => json_lines
}
}
執行配置檔案:
[[email protected] logstash-2.3.1]# ./bin/logstash -f ./conf/es-jdbc/es-mysql.conf
出現如下,表示執行成功: