1. 程式人生 > >Elasticsearch 與 mysql 同步資料 (logstash-input-jdbc)

Elasticsearch 與 mysql 同步資料 (logstash-input-jdbc)

ELK部署參考: https://blog.csdn.net/gekkoou/article/details/80979374

本文使用 logstash 外掛 jdbc 來實現 elasticsearch 同步 mysql 資料
外掛 jdbc 官方詳解: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

執行命令 logstash -f logstash-input-jdbc.conf

logstash-input-jdbc.conf 程式碼

input {
    jdbc {
        # mysql 資料庫連結
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/database" # 使用者名稱和密碼 jdbc_user => "root" jdbc_password => "root" # 驅動, 下載地址: https://dev.mysql.com/downloads/connector/j/ jdbc_driver_library => "/usr/share/logstash/driver/mysql-connector-java-8.0.11.jar"
# 驅動類名 jdbc_driver_class => "com.mysql.jdbc.Driver" # 開啟分頁查詢 jdbc_paging_enabled => "true" jdbc_page_size => "5000" # 執行sql的檔案路徑 # statement_filepath => "/usr/share/logstash/config/jdbc.sql" # 或直接執行的sql語句 (例如 mtime 欄位為修改時間) statement => "SELECT * FROM test WHERE mtime > :sql_last_value"
# 設定監聽間隔, 各欄位含義(由左至右)分、時、天、月、年, 全部為*預設含義為每分鐘都更新 schedule => "* * * * *" # 索引型別 (同步多表時用來區分) type => "jdbc" # 是否將 column 名稱轉小寫 # lowercase_column_names => false # 是否清除 last_run_metadata_path 的記錄, 如果為真, 那麼每次都相當於從頭開始查詢所有的資料庫記錄 clean_run => false record_last_run => "true" use_column_value => "true" tracking_column => "mtime" # 最後執行時儲存 tracking_column 的檔案路徑 last_run_metadata_path => "/usr/share/logstash/data/logstash-input-jdbc-last_run" } } filter { json { source => "message" remove_field => ["message"] } date { match => ["ctime", "yyyy-MM-dd HH:mm:ss,SSS", "UNIX"] #match => ["ctime", "UNIX"] 或者這樣 target => "@timestamp" locale => "cn" } } output { elasticsearch { # ES的IP地址及埠 hosts => ["localhost:9200"] # 索引名稱 index => "test" # 自增ID 需要關聯的資料庫中有有一個id欄位,對應索引的id號 document_id => "%{id}" } }