1. 程式人生 > >logstash-input-jdbc實現mysql 與elasticsearch實時同步

logstash-input-jdbc實現mysql 與elasticsearch實時同步

實現MySQL資料庫中資料到Elasticsearch的實時同步:

首先需要做好的準備工作:

1、伺服器上安裝好elasticsearch和logstash

2、安裝logstash-input-jdbc外掛,但從logstash5.X開始,已經至少集成了logstash-input-jdbc外掛。所以,你如果使用的是logstash5.X,可以不必再安裝,可以直接跳過這一步。

外掛安裝可參考:http://blog.csdn.net/yeyuma/article/details/50240595#quote 

注:在以上配置映象的過程中taobao的映象地址已經改為如下地址:http://gems.ruby-china.com/ added to sources

 

如何同步?

1、建立資料庫和表

資料庫:test

表:item

CREATE TABLE `item` (
  `item_id` int(11) NOT NULL,
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `price` decimal(10,2) DEFAULT NULL,
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`item_id`),
  KEY `user` (`id`),
  CONSTRAINT `user` FOREIGN KEY (`id`) REFERENCES `user` (`id`) ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

2、準備一個es-mysql.conf檔案

配置如下:

input {
    stdin {
    }
    jdbc {
      jdbc_connection_string => "jdbc:mysql://node01:3306/test"
      jdbc_user => "root"
      jdbc_password => "root"
      jdbc_driver_library => "/home/mysql-connector-java-5.1.28-bin.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"

      #下面兩個引數是增量匯入和更新的關鍵

      #use_column_value設定為true時,使用tracking_column定義的值作為sql_last_value,預設為false
      #use_column_value => true

      # sql_last_value用於計算要查詢的行的值。在執行任何查詢之前,將其設定為1970年1月1日星期四,如果  use_column_value為true,則tracking_column則設定為0。在後續查詢執行後,它會相應更新。
      #tracking_column => "create_time"

      #兩種sql查詢方法,一種直接書寫在conf中,另一種從配置檔案中讀取
      #statement_filepath => "/bigdata/logstash-2.3.1/conf/es-jdbc/es-mysql.sql"
      statement => "select * from item where create_time>=:sql_last_value"

      #定時欄位 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新
      schedule => "* * * * *"
      type => "jdbc"
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
    elasticsearch {

        #ES的地址與埠
        hosts => ["node01:9200", "node02:9200", "node03:9200"]

    # ES的索引,自己定義
    index => "mysql-es"

    # ES的唯一id標識
        document_id => "%{item_id}"
    codec => plain {
        charset => "UTF-16BE"
      }
    }
    stdout{
        codec => json_lines
    }
}

 

執行配置檔案:

[[email protected] logstash-2.3.1]# ./bin/logstash -f ./conf/es-jdbc/es-mysql.conf 

出現如下,表示執行成功: