1. 程式人生 > >25.logstash一次同步Mysql多張表到ES(ES與關係型資料庫同步)

25.logstash一次同步Mysql多張表到ES(ES與關係型資料庫同步)

題記

一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。 
博友前天線上提問,說明這塊理解的還不夠透徹。 
我整理下, 
一是為了儘快解決博友問題, 
二是加深記憶,便於未來產品開發中快速上手。

1、同步原理

原有ES專欄中有詳解,不再贅述。詳細請參考我的專欄: 
深入詳解Elasticsearch 
以下是通過ES5.4.0, logstash5.4.1 驗證成功。 
可以確認的是2.X版本同樣可以驗證成功。

2、核心配置檔案

input {
  stdin {
  }

  jdbc {
  type => "cxx_article_info"
  # mysql jdbc connection string to our backup databse 後面的test對應mysql中的test資料庫
jdbc_connection_string => "jdbc:mysql://110.10.15.37:3306/cxxwb" # the user we wish to excute our statement as jdbc_user => "root" jdbc_password => "xxxxx" record_last_run => "true" use_column_value => "true" tracking_column => "id" last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_info"
clean_run => "false" # the path to our downloaded jdbc driver jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-5.1.38.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "500"
statement => "select * from cxx_article_info where id > :sql_last_value" #定時欄位 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新 schedule => "* * * * *" #設定ES索引型別 } jdbc { type => "cxx_user" # mysql jdbc connection string to our backup databse 後面的test對應mysql中的test資料庫 jdbc_connection_string => "jdbc:mysql://110.10.15.37:3306/cxxwb" # the user we wish to excute our statement as jdbc_user => "root" jdbc_password => "xxxxxx" record_last_run => "true" use_column_value => "true" tracking_column => "id" last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_user_info" clean_run => "false" # the path to our downloaded jdbc driver jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-5.1.38.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "500" statement => "select * from cxx_user_info where id > :sql_last_value" #以下對應著要執行的sql的絕對路徑。 #statement_filepath => "/opt/logstash/bin/logstash_mysql2es/department.sql" #定時欄位 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新 schedule => "* * * * *" #設定ES索引型別 } } filter { mutate { convert => [ "publish_time", "string" ] } date { timezone => "Europe/Berlin" match => ["publish_time" , "ISO8601", "yyyy-MM-dd HH:mm:ss"] } #date { # match => [ "publish_time", "yyyy-MM-dd HH:mm:ss,SSS" ] # remove_field => [ "publish_time" ] # } json { source => "message" remove_field => ["message"] } } output { if [type]=="cxxarticle_info" { elasticsearch { #ESIP地址與埠 hosts => "10.100.11.231:9200" #ES索引名稱(自己定義的) index => "cxx_info_index" #自增ID編號 # document_id => "%{id}" } } if [type]=="cxx_user" { elasticsearch { #ESIP地址與埠 hosts => "10.100.11.231:9200" #ES索引名稱(自己定義的) index => "cxx_user_index" #自增ID編號 # document_id => "%{id}" } } }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104

3、同步成功結果

[2017-07-19T15:08:05,438][INFO ][logstash.pipeline ] Pipeline main started
The stdin plugin is now waiting for input:
[2017-07-19T15:08:05,491][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-07-19T15:09:00,721][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 0) AS `t1` LIMIT 1
[2017-07-19T15:09:00,721][INFO ][logstash.inputs.jdbc ] (0.008000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 0) AS `t1` LIMIT 1
[2017-07-19T15:09:00,730][INFO ][logstash.inputs.jdbc ] (0.004000s) SELECT * FROM (select * from cxx_user_info where id > 0) AS `t1` LIMIT 500 OFFSET 0
[2017-07-19T15:09:00,731][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT * FROM (select * from cxx_article_info where id > 0) AS `t1` LIMIT 500 OFFSET 0
[2017-07-19T15:10:00,173][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 3) AS `t1` LIMIT 1
[2017-07-19T15:10:00,174][INFO ][logstash.inputs.jdbc ] (0.003000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 2) AS `t1` LIMIT 1
[2017-07-19T15:11:00,225][INFO ][logstash.inputs.jdbc ] (0.001000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 3) AS `t1` LIMIT 1
[2017-07-19T15:11:00,225][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 2) AS `t1` LIMIT 1

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

4、擴充套件

1)多個表無非就是在input裡面多加幾個型別,在output中多加基礎 
型別判定。 
舉例:

if [type]=="cxx_user"  
  • 1

2)input裡的type和output if判定的type**保持一致**,該type對應ES中的type。

後記

死磕ES,有問題歡迎大家提問探討!

相關推薦

25.logstash同步MysqlES(ES關係型資料庫同步)

題記一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。 博友前天線上提問,說明這塊理解的還不夠透徹。 我整理下, 一是為了儘快解決博友問題, 二是加深記憶,便於未來產品開發中快速上手。1、同步原理原有ES專欄中有詳解,不再贅述。詳細請參考我的專欄: 深

mysql 條sql更新資料,

<update id="deleteActor" parameterType="java.lang.String"> UPDATE actor_tbl a LEFT JOIN video_actor_tbl b ON a.sn = b.actorsn SET a.

如何用VBA將一個工作簿裡的名字人數彙總在表裡

Sub text1()Worksheet.Add.Name="彙總"For Each Sheet In Worksheetsk = k + 1x = y.usedRange.Rows.Countsheets("彙總").Cells(k, 1) = Sheet.Nameshee

21.go-mysql-elasticsearch實現mysql elasticsearch實時同步(ES關係型資料庫同步)

引言:go-mysql-elasticsearch 是國內作者開發的一款外掛。測試表明:該外掛優點:能實現同步增、刪、改、查操作。不足之處(待完善的地方): 1、仍處理開發、相對不穩定階段; 2、沒有日誌,不便於排查問題及檢視同步結果。 本文深入詳解了外掛的安裝、使用、增刪改

22.mysql elasticsearch實時同步常用外掛及優缺點對比(ES關係型資料庫同步)

前言:目前mysql與elasticsearch常用的同步機制大多是基於外掛實現的,常用的外掛包括:elasticsearch-jdbc, elasticsearch-river-MySQL , go-mysql-elasticsearch, logstash-input-j

Logstash資料庫同步

一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。  博友前天線上提問,說明這塊理解的還不夠透徹。  我整理下,  一是為了儘快解決博友問題,  二是加深記憶,便於未來產品開發中快速上手。 1、同步原理 原有ES專欄中有詳解,不再贅述。詳細請參

mysql 從相同類型的中提取到

tab cursor 同時 sql and ble 有時 eat sch 蝸牛背著沈重的殼,貼著地面一步步艱難地向前爬行,不回頭,也不左顧右盼,只是朝著自己想到達的地方行進。 有時候需要從多張相同類型的表中提取數據,這些表有一些相同的列或者表結構完全相同,同時表名存在一定的

mysql條sql語句中使用個count關聯查詢

在給公司寫程式碼的過程中忽然發現在sql語句中碰到一些小問題,雖然最終自己解決了,但是不得不說好不容易啊,故寫此部落格提醒自己,以免日後再在此處浪費時間。 需求: 有兩張表,分別是marketChan

Mysql 條update語句,更新(存在關聯的

 UPDATE table1     LEFT JOIN table2 ON table1.xx=table2.xx (關聯的欄位)    SET table1.xx=value,table2.xx=value (update value)WHERE table1.xx=xx

mysql把某列的資料更新到另列中(涉及到的資料)

假設有三張表A,B,C現在要把C表中的一列資料更新到A表中,而A和C之間還有張B表。假設要把C表中的id更新到A表中,那麼就可以這樣寫:UPDATE A AINNER JOIN B ON A.xxx = B.xxxINNER JOIN C ON B.xxx= C.xxxSET

MySqlupdate更新條資料

我最近遇到一個匯入Excel表中資料到資料庫的需求,資料量一萬多條。需先查詢資料庫中是否存在要插入的資料,若存在則更新資料,否則插入資料。我首先按照常規思路先查詢,然後插入或更新。然而nginx返回 5

mysql left join 左連接查詢關聯n

part 存在 col 外鍵 ros bold new 多條 ble left join 左連接即以左表為基準,顯示坐標所有的行,右表與左表關聯的數據會顯示,不關聯的則不顯示。關鍵字為left join on。 **基本用法如下: select table a le

hql語句比對查詢單個字段

end nta 不能 ase color poll 存儲 pen ike 前端輸入客戶名稱,在使用hql查詢時,要同時比對表中,客戶名稱,客戶簡稱,拼音簡寫,客戶編碼等多個字段 hql寫法 String fdCustomerName=cv.poll("docMain.fdC

線上MySQL數據庫死鎖問題

重復 成功 中一 主鍵 adl 一次 his TE BE 最近線上項目報了一個MySQL死鎖(DealLock)錯誤,雖說對業務上是沒有什麽影響的,由於自己對數據庫鎖這塊了解不是很多,之前也沒怎麽的在線上碰到過。這次剛好遇到了,便在此記錄一下。 出現

mkdir創建個目錄

size sha color 承載 ima 技術分享 系統 term ext 系統管理員必用的十大基礎之一也可以這樣連貫起來一次性創建.... 其中的知識點其實就是花括號{}{};可承載一個以逗號(,)分割的列表,並將其展開為多個列表。。。。有個知識點 展開命令行~USER

如何在 Linux 中重新命名個檔案

你可能已經知道,我們使用 mv 命令在類 Unix 作業系統中重新命名或者移動檔案和目錄。 但是,mv 命令不支援一次重新命名多個檔案。 不用擔心。 在本教程中,我們將學習使用 Linux 中的 mmv 命令一次重新命名多個檔案。 此命令用於在類 Unix 作業系統中使用標準萬用字元批量移動、複製、追加和重新

C#實現類只例項化(被個類訪問呼叫)

C#簡單寫法如下: public  class  Singleton {      private  static  Singleton _instance =  n

踩坑經歷(五)關於MySQL儲存過程的排錯

SQL語句 -- #開啟定時器,預設為關閉狀態 set global event_scheduler =1; #或者set GLOBAL event_scheduler = ON; use monitorsys; drop event if exists report_back

mysql-使用儲存過程一次性批量建立

大家好,謝謝大家閱讀我的文章,請多多指教 如何使用儲存過程一次性建立多張表,^-^,程式碼如下: 業務需求:專案中需要建立100張表 我使用了儲存過程來實現 這是我專案中使用到的一個小小功能 DELIMITER $$ USE `DBName`$$ DROP PROCEDURE

線上mysql死鎖分析

一、現象 發運車次呼叫發車介面時發生異常,後臺丟擲資料庫死鎖日誌。   二、原因分析   通過日誌可以看出事務T1等待 heap no 8的行鎖 (X locks 排他鎖)                 事務T2持有heap no 8的行鎖,等待heap no 7的行鎖 兩個更新運