1. 程式人生 > >logstash-input-jdbc增量、全量資料同步

logstash-input-jdbc增量、全量資料同步

一、場景

筆者在mysql資料同步到ES中,發現第一次同步時需要全量的資料,之後則需要定時去同步增量資料,所以筆者提供增量和全量同步的conf供讀者參考


二、解決方案

1、全量資料同步

具體如何執行可參考https://blog.csdn.net/w_linux/article/details/84555506,這裡提供conf的配置

input {
  jdbc {
    jdbc_driver_library => "./config/mysql-connector-java-5.1.39.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 資料庫相關配置
    jdbc_connection_string => "jdbc:ip:port/jdcomm?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "password"
    statement => "SELECT * FROM blwjxb_z"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    schedule => "*/10 * * * *"
  }
}

filter {
   json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => "localhost"
    #將mysql資料加入myindex索引下,會自動建立
    index => "myindex"
    # 自增ID 需要關聯的資料庫中有有一個id欄位,對應索引的id號
    document_id => "%{id}"
  }        
} 

2、增量資料同步

input {
  jdbc {
    jdbc_driver_library => "./config/mysql-connector-java-5.1.39.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # 資料庫相關配置
    jdbc_connection_string => "jdbc:ip:port/jdcomm?characterEncoding=UTF-8&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "password"
    statement => "SELECT * FROM blwjxb_z where id > :sql_last_value"
    #使用其它欄位追蹤,而不是用時間
    use_column_value => true
    #追蹤的欄位
    tracking_column => id
    record_last_run => true
    #上一個sql_last_value值的存放檔案路徑, 必須要在檔案中指定欄位的初始值
    last_run_metadata_path => "./config/station_parameter.txt"
    jdbc_paging_enabled => "true"
    jdbc_page_size => "50000"
    schedule => "* * * * *"
  }
}

filter {
   json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => "localhost"
    #將mysql資料加入myindex索引下,會自動建立
    index => "myindex"
    # 自增ID 需要關聯的資料庫中有有一個id欄位,對應索引的id號
    document_id => "%{id}"
  }        
} 

思路就是每次指令碼定時執行的時候會去找id>station_parameter.txt中設定的數值,每次增量資料同步後,station_parameter.txt中的數值會自動更新。

station_parameter.txt中的資料初始如下圖