1. 程式人生 > >Logstash從資料庫一次同步多張表

Logstash從資料庫一次同步多張表

一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。 
博友前天線上提問,說明這塊理解的還不夠透徹。 
我整理下, 
一是為了儘快解決博友問題, 
二是加深記憶,便於未來產品開發中快速上手。

1、同步原理

原有ES專欄中有詳解,不再贅述。詳細請參考我的專欄: 
深入詳解Elasticsearch 
以下是通過ES5.4.0, logstash5.4.1 驗證成功。 
可以確認的是2.X版本同樣可以驗證成功。

2、核心配置檔案

input {
  stdin {
  }

  jdbc {
  type => "cxx_article_info"
  # mysql jdbc connection string to our backup databse 後面的test對應mysql中的test資料庫
jdbc_connection_string => "jdbc:mysql://110.10.15.37:3306/cxxwb" # the user we wish to excute our statement as jdbc_user => "root" jdbc_password => "xxxxx" record_last_run => "true" use_column_value => "true" tracking_column => "id" last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_info"
clean_run => "false" # the path to our downloaded jdbc driver jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-5.1.38.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "500"
statement => "select * from cxx_article_info where id > :sql_last_value" #定時欄位 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新 schedule => "* * * * *" #設定ES索引型別 } jdbc { type => "cxx_user" # mysql jdbc connection string to our backup databse 後面的test對應mysql中的test資料庫 jdbc_connection_string => "jdbc:mysql://110.10.15.37:3306/cxxwb" # the user we wish to excute our statement as jdbc_user => "root" jdbc_password => "xxxxxx" record_last_run => "true" use_column_value => "true" tracking_column => "id" last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_user_info" clean_run => "false" # the path to our downloaded jdbc driver jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-5.1.38.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "500" statement => "select * from cxx_user_info where id > :sql_last_value" #以下對應著要執行的sql的絕對路徑。 #statement_filepath => "/opt/logstash/bin/logstash_mysql2es/department.sql" #定時欄位 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新 schedule => "* * * * *" #設定ES索引型別 } } filter { mutate { convert => [ "publish_time", "string" ] } date { timezone => "Europe/Berlin" match => ["publish_time" , "ISO8601", "yyyy-MM-dd HH:mm:ss"] } #date { # match => [ "publish_time", "yyyy-MM-dd HH:mm:ss,SSS" ] # remove_field => [ "publish_time" ] # } json { source => "message" remove_field => ["message"] } } output { if [type]=="cxxarticle_info" { elasticsearch { #ESIP地址與埠 hosts => "10.100.11.231:9200" #ES索引名稱(自己定義的) index => "cxx_info_index" #自增ID編號 # document_id => "%{id}" } } if [type]=="cxx_user" { elasticsearch { #ESIP地址與埠 hosts => "10.100.11.231:9200" #ES索引名稱(自己定義的) index => "cxx_user_index" #自增ID編號 # document_id => "%{id}" } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104

3、同步成功結果

[2017-07-19T15:08:05,438][INFO ][logstash.pipeline ] Pipeline main started
The stdin plugin is now waiting for input:
[2017-07-19T15:08:05,491][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-07-19T15:09:00,721][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 0) AS `t1` LIMIT 1
[2017-07-19T15:09:00,721][INFO ][logstash.inputs.jdbc ] (0.008000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 0) AS `t1` LIMIT 1
[2017-07-19T15:09:00,730][INFO ][logstash.inputs.jdbc ] (0.004000s) SELECT * FROM (select * from cxx_user_info where id > 0) AS `t1` LIMIT 500 OFFSET 0
[2017-07-19T15:09:00,731][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT * FROM (select * from cxx_article_info where id > 0) AS `t1` LIMIT 500 OFFSET 0
[2017-07-19T15:10:00,173][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 3) AS `t1` LIMIT 1
[2017-07-19T15:10:00,174][INFO ][logstash.inputs.jdbc ] (0.003000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 2) AS `t1` LIMIT 1
[2017-07-19T15:11:00,225][INFO ][logstash.inputs.jdbc ] (0.001000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 3) AS `t1` LIMIT 1
[2017-07-19T15:11:00,225][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 2) AS `t1` LIMIT 1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

4、擴充套件

1)多個表無非就是在input裡面多加幾個型別,在output中多加基礎 
型別判定。 
舉例:

if [type]=="cxx_user"  
  • 1

2)input裡的type和output if判定的type**保持一致**,該type對應ES中的type。

後記

死磕ES,有問題歡迎大家提問探討!

—————————————————————————————————— 
更多ES相關實戰乾貨經驗分享,請掃描下方【銘毅天下】微信公眾號二維碼關注。 
(每週至少更新一篇!)

2017年07月19日 23:32 於家中床前


相關推薦

Logstash資料庫同步

一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。  博友前天線上提問,說明這塊理解的還不夠透徹。  我整理下,  一是為了儘快解決博友問題,  二是加深記憶,便於未來產品開發中快速上手。 1、同步原理 原有ES專欄中有詳解,不再贅述。詳細請參

mysql 相同類型的中提取到

tab cursor 同時 sql and ble 有時 eat sch 蝸牛背著沈重的殼,貼著地面一步步艱難地向前爬行,不回頭,也不左顧右盼,只是朝著自己想到達的地方行進。 有時候需要從多張相同類型的表中提取數據,這些表有一些相同的列或者表結構完全相同,同時表名存在一定的

使用JDBC插入條記錄

程式碼如下: public static void insertBatch() { int count[]; int count1[]; Boolean isinsert = false; Connection con = null; PreparedS

在JDBC插入條記錄

{   con = getCon();   con.setAutoCommit(false);                                   // 需要用到事務,不能讓他自動提交,需要手動提交   pst = con.prepareStatement(INSERT_SQL);      

一個關聯兩 desc降序 asc升序

public function wocaotuijian(){ $list=Db::table('xc_tuijiansum')->alias('a') ->join('xc_member me','a.ui

25.logstash同步Mysql到ES(ES與關係型資料庫同步)

題記一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。 博友前天線上提問,說明這塊理解的還不夠透徹。 我整理下, 一是為了儘快解決博友問題, 二是加深記憶,便於未來產品開發中快速上手。1、同步原理原有ES專欄中有詳解,不再贅述。詳細請參考我的專欄: 深

基於ELK5.1(ElasticSearch, Logstash, Kibana)的整合測試

success move maven issues ats call using env proto 前言開源實時日誌分析ELK平臺(ElasticSearch, Logstash, Kibana組成),能很方便的幫我們收集日誌,進行集中化的管理,並且能很方便的進行日誌的統

dtd + 復雜元素的子元素出現次數

ges block 珍惜 html XML 1.0 條件 sch version 禮悟:   好好學習多思考,尊師重道存感恩。葉見尋根三二一,江河湖海同一體。 虛懷若谷良心主,願行無悔給最苦。讀書鍛煉強身心,誠勸且行且珍惜。 xml:1.0

下載個文件

itl cnblogs script log logs html 文件 download nload 一次下載多個文件 JavaScript多文件下載一次下載多個文件

分離式編譯時 鏈接器工具錯誤 (一個變量被定義

效果 include private 可讀性 con lnk2005 可能 ring 生成 在編寫程序時,將類中的函數成員的聲明和定義分開,在頭文件(.h)中進行聲明,在源文件(.cpp)中進行定義 以及具體功能的實現。達到分離式編譯的效果,提高代碼的可讀性。 自己在編寫是

針對臺服務器交互式主機命令采集Python腳本編寫

.py 數據庫版本 toad 監控主機 pro efault 分享 多臺 linux 【環境介紹】 系統環境:Linux + Python 2.7.10(監控主機) 【背景描述】 需求:每次節假日或者重要時間時,需要對數據庫主機信息進行檢查,比如主機空間使用率之類。

(轉)Spring文件上傳,包括選中個文件

bmi while .html span cto input 獲取文件 dex asn 背景: http://www.cnblogs.com/lixuwu/p/8495275.html已經實現了單文件的上傳和下載,多文件的上傳是另一種情景,這裏記錄下來 實現過程

安裝版本php的四個雷區,你踩著了嗎

path start cgi 命令執行 mysq -c tool port 一鍵 記一次安裝多版本的php的四個雷區,你踩著了嗎 需求:公司需要在同一臺服務器上安裝不同版本的php,而這一臺的服務上已經安裝了php.7.1,現需要同

oracle Insert 插入條記錄

pan rac ora 方法 tab where ble code 兩種方法 oracle Insert 一次插入多條記錄有兩種方法: 1)Insert All Into table_name values ... insert all into table_name v

下載個檔案的解決思路-JS

一次下載多個檔案的解決思路(iframe) - Eric 真實經歷 最近開發專案需要做檔案下載,想想挺簡單的,之前也做過,後臺提供下載介面,前端使用window.location.href就行了唄。不過開發的時候發現,有些檔案有附屬檔案,點選 下載按鈕 需要下載兩個檔案,而且不能使用壓縮包的形式。想想

800萬XML文字檔案預處理經歷

一.背景 由於某些需求,現需對系統在最近幾個月生成的xml檔案進行預處理,提取<text>標籤內的資料進行分析。這些需要預處理的資料大概有280GB左右880多萬,存放在gysl目錄下,gysl的下一層按天命名,分為若干個目錄,接下來一層目錄下又有多個目錄,我們所需的xml目錄就在這一層。我們現

800萬XML文本文件預處理經歷

超過 random while 表達式 range utf-8 test 現在 其他 一.背景 由於某些需求,現需對系統在最近幾個月生成的xml文件進行預處理,提取<text>標簽內的數據進行分析。這些需要預處理的數據大概有280GB左右880多萬,存放在gys

maven 打包個maven專案

maven 一次打包多個maven專案。 使用場景 一個專案由多個子專案組成,每個子專案也是一個maven專案。每次打包需要打包每個子專案,很麻煩,其實可以通過配置一個頂級的pom.xml檔案來解決這個問題,只需要打包頂層的maven專案,即可。如果一個專案有多個子專案的pom.

Laravel更新條記錄,批量更新的方法

在我們實際應用中,免不了這樣的情況——例如我們同時錄入多條資訊,可能三條五條還好說,但量一旦變大,就會增加讀寫資料庫的次數,會降低效率,那麼,我們該如何實現,做到一次讀寫資料庫,批量更新呢? 例如這種情況: HTML程式碼: <!doctype html> &l

使用JDBC插入條記錄(以MySQL為例)

閱讀本文需要的先修知識: 最基本的SQL語句 最基本的JDBC操作(如插入單條記錄) 如急需使用請直接看最後一段程式碼。 在JDBC中,插入記錄最簡單的方法是使用executeUpdate()方法,但該方法中的引數只能是單條SQL語句,其實對於需要INSERT或者UPDA