1. 程式人生 > >利用logstash從mysql同步資料到ElasticSearch

利用logstash從mysql同步資料到ElasticSearch

前面一篇已經把logstash和logstash-input-jdbc安裝好了。

下面就說下具體怎麼配置。

1.先在安裝目錄bin下面(一般都是在bin下面)新建兩個檔案jdbc.conf和jdbc.sql

2.配置jdbc.conf

 1 input {
 2       stdin {
 3        }
 4       jdbc {
 5         # 連線的資料庫地址和哪一個資料庫,指定編碼格式,禁用SSL協議,設定自動重連
 6         jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/microstorage_backend?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
 7
jdbc_user => "root" 8 jdbc_password => "123456" 9 # 下載連線資料庫的驅動包,建議使用絕對地址 10 jdbc_driver_library => "/usr/local/Cellar/logstash/6.5.4/libexec/logstash-core/lib/jars/mysql-connector-java-5.1.42.jar" 11 12 jdbc_driver_class => "com.mysql.jdbc.Driver" 13 jdbc_paging_enabled => "true" 14
jdbc_page_size => "50000" 15 codec => plain { charset => "UTF-8"} 16 17 #使用其它欄位追蹤,而不是用時間 18 #use_column_value => true //這裡如果是用時間追蹤比如:資料的更新時間或建立時間等和時間有關的這裡一定不能是true, 切記切記切記,我是用update_time來追蹤的 19 #追蹤的欄位 20 tracking_column => update_time 21 record_last_run => true
22 #上一個sql_last_value值的存放檔案路徑, 必須要在檔案中指定欄位的初始值 這裡說是必須指定初始值,我沒指定預設是1970-01-01 08:00:00 23 last_run_metadata_path => "/usr/local/opt/logstash/lastrun/.logstash_jdbc_last_run" //這裡的lastrun資料夾和.logstash_jdbc_last_run是自己建立的 24 25 jdbc_default_timezone => "Asia/Shanghai" //設定時區 26 #statement => SELECT * FROM goods WHERE update_time > :last_sql_value //這裡要說明一下如果直接寫sql語句,前面這種寫法肯定不對的 27                                                 ,加上引號也試過也不對,所以我直接寫在jdbc.sql檔案中 28 statement_filepath => "/usr/local/Cellar/logstash/6.5.4/bin/jdbc.sql" 29 30 31 #是否清除 last_run_metadata_path 的記錄,如果為真那麼每次都相當於從頭開始查詢所有的資料庫記錄 32 clean_run => false 33 34 # 這是控制定時的,重複執行匯入任務的時間間隔,第一位是分鐘 不設定就是1分鐘執行一次 35 schedule => "* * * * *" 36 type => "std" 37 } 38 } 39 40 filter { 41 42 json { 43 44 source => "message" 45 46 remove_field => ["message"] 47 48 } 49 50 } 51 52 output { 53 54 elasticsearch { 55 56 # 要匯入到的Elasticsearch所在的主機 57 58 hosts => "127.0.0.1:9200" 59 60 # 要匯入到的Elasticsearch的索引的名稱 61 62 index => "goods" 63 64 # 型別名稱(類似資料庫表名) 65 66 document_type => "spu" 67 68 # 主鍵名稱(類似資料庫主鍵) 69 70 document_id => "%{id}" 71 } 72 73 stdout { 74 75 # JSON格式輸出 76 77 codec => json_lines 78 79 } 80 }

3.配置jdbc.sql

1 select id,goods_name,goods_no,price,account_id,create_time,update_time from goods where update_time > :sql_last_value

4.我們來看下 .logstash_jdbc_last_run檔案中的內容(網上講述該配置的時候都沒講到裡面具體的內容寫法,導致很多人很迷惑,其中我就是)

前面的---具體什麼意思,我也不太清楚。

 

5.啟動jdbc.conf配置,開始同步資料

 

第一次:因為時間是從1970年開始的所以會全部同步一遍。相當於全量同步了

從第二次開始,會從上次最新的一次時間同步,既新增和修改都會同步

 

總結:主要是配置,有什麼問題,先檢查配置檔案。