1. 程式人生 > >logstash-input-jdbc外掛實現關係型資料庫和ES增量同步

logstash-input-jdbc外掛實現關係型資料庫和ES增量同步

環境安裝

官網下載你要部署環境的相應安裝包,這裡以linux系統為例。
選擇與你的ES相同版本的logstash的tar.gz包,上傳解壓,進入解壓目錄下測試下。
使用bin/logstash -e 'input { stdin { } } output { stdout {} }',啟動後輸入任意內容後,如果有返回則表示安裝成功
這裡寫圖片描述

使用logstash-input-jdbc外掛

先使用bin/logstash-plugin list檢視下已安裝好的外掛一般5.X以後的版本都會預設安裝好此外掛。
這裡寫圖片描述

下面羅列一下,要實現增量同步需要的東西,以orcal為例:

  • orcal連線jar包
  • 能搜尋新增值的sql
    我的sql: :sql_last_value是上次同步的最後值
select *
from drc_policy_publish p
where record_no > :sql_last_value  
order by record_no

接下類建立一個啟動的conf檔案,vi conf/orcal.conf,裡面配置如下

input {
    jdbc {
        jdbc_connection_string => "jdbc:oracle:thin:@//ip:1521/orcl"
        # 使用者名稱和密碼
        jdbc_user => "user"
jdbc_password => "password" # 驅動,準備的連線jar包位置 jdbc_driver_library => "/opt/elk/logstash-5.6.8/orcal/ojdbc-6.jar" # 驅動類名 jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" #配置一次同步的最大數量 jdbc_paging_enabled => "true" jdbc_page_size => "50000"
# 執行的sql 檔案路徑+名稱 statement_filepath => "/opt/elk/logstash-5.6.8/orcal/sql/policy.sql" # 設定監聽間隔 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新 schedule => "* * * * *" # 是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 欄位的值記錄下來,儲存到 last_run_metadata_path 指定的檔案中 record_last_run => "true" # 是否需要記錄某個column 的值,如果record_last_run為真,可以自定義我們需要 track 的 column 名稱,此時該引數就要為 true. 否則預設 track 的是 timestamp 的值. use_column_value => "true" # 如果 use_column_value 為真,需配置此引數. track 的資料庫 column 名,該 column 必須是遞增的. 一般是mysql主鍵 tracking_column => "record_no" #用於儲存上次同步的最後值,先新建,輸入0比較好,不然是使用null去比較 last_run_metadata_path => "/opt/elk/logstash-5.6.8/orcal/last_id" # 是否清除 last_run_metadata_path 的記錄,如果為真那麼每次都相當於從頭開始查詢所有的資料庫記錄 clean_run => "false" # 是否將 欄位(column) 名稱轉小寫 lowercase_column_names => "true" } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { hosts => "ip:9200" index => "orcal" document_type => "drc_polic_publish" # 將"_id"的值設為資料庫主鍵 document_id => "%{record_no}" } }

如果你的表資料中沒有遞增的列,那也可以使用預設的timestamp 值,不過你需要將es中_id的值也設定成timestamp的值。如果讓es自己來建立_id,則會一直插入資料,因為它無法判斷哪些是新值。

在啟動前,你需要先在ES中建立好索引的對映,不然就都要使用預設映射了。

接下來啟動使用bin/logstash -f config/orcal.conf 命令啟動,可以看到兩次的搜尋引數是不同的
這裡寫圖片描述
可以看到資料已經放進去了
這裡寫圖片描述
也可以去Discover,把主鍵欄位排下序,檢視最新的同步資料是什麼
這裡寫圖片描述

上述的啟動方式,會佔用命令列,如果驗證好自己的配置沒有問題後,可以修改schedule引數,比如schedule => “* 8 * * *”,每八小時同步一次,然後使用nohup命令後臺執行logstash nohup bin/logstash -f conf/orcal.conf &