1. 程式人生 > >第八篇 elasticsearch鏈接mysql自動更新數據庫

第八篇 elasticsearch鏈接mysql自動更新數據庫

star 監聽 connector start ood eric color ack path

增量更新

input {
  jdbc {
    jdbc_driver_library => "D:\tools\mysql\mysql-connector-java-5.1.45/mysql-connector-java-5.1.45-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/canyin?characterEncoding=UTF-8&useSSL=false"
    jdbc_user 
=> "root" jdbc_password => "228151" statement => "SELECT * FROM goods" jdbc_paging_enabled => "true" jdbc_page_size => "50000" schedule => "* * * * *" type => "foods" record_last_run => true last_run_metadata_path => "" clean_run => false
} } filter { json { source => "message" remove_field => ["message"] } } output { stdout { codec => rubydebug } elasticsearch { hosts => "127.0.0.1:9200" index => "goods" document_type=>"foods" document_id=>"%{id}" } }

增量更新的參數介紹:

input {
    stdin {
    
    }
    jdbc {
        # 數據庫地址  端口  數據庫名
        jdbc_connection_string => "jdbc:mysql://localhost:3306/shen"
        # 數據庫用戶名      
        jdbc_user => "root"
        # 數據庫密碼
        jdbc_password => "rootroot"
        # mysql java驅動地址 
        jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-5.1.43-bin.jar"
        # 驅動類的名稱
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"

        #是否記錄上次運行的結果
        record_last_run => true
        #記錄上次運行結果的文件位置
        last_run_metadata_path => ""
        #是否使用數據庫某一列的值,
        use_column_value => true
        tracking_column => "id"
        #numeric或者timestamp
        tracking_column_type => "numeric"
        
        #如果為true則會清除 last_run_metadata_path 的記錄,即重新開始同步數據
        clean_run => false

        #sql_last_value根據tracking類型,默認為0或者1970-1-1
        statement => "SELECT * FROM TABLE WHERE id > :last_sql_value"
        # sql 語句文件,對於復雜的查詢,可以放在文件中。
        # statement_filepath => "filename.sql"
        # 設置監聽間隔,語法與Linux系統Cron相同
        schedule => "* * * * *"
    }
}
output {
     stdout {
        codec => json_lines
    }
   elasticsearch {
        hosts  => "localhost:9200"
        index => "contacts"
     document_type => "contact"
        document_id => "%{id}"
    }
}

chedule現在設置成每分鐘都執行一次,是為了方便觀察行為。statefile這一句是一定要加的。$metrics.lastexecutionstart就是這個腳本的關鍵所在了,這個指的是上一次腳本執行的時間,可以通過比較這個時間和數據庫裏的字段來判斷是否要更新。
參考文獻:http://www.cnblogs.com/cocowool/p/mysql_data_to_elasticsearch_via_logstash.html
http://www.cnblogs.com/zhongshengzhen/p/elasticsearch_logstash.html
配置Logstash https://www.elastic.co/guide/en/logstash/current/configuration.html#

第八篇 elasticsearch鏈接mysql自動更新數據庫