25.logstash一次同步Mysql多張表到ES(ES與關係型資料庫同步)
題記
一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。
博友前天線上提問,說明這塊理解的還不夠透徹。
我整理下,
一是為了儘快解決博友問題,
二是加深記憶,便於未來產品開發中快速上手。
1、同步原理
原有ES專欄中有詳解,不再贅述。詳細請參考我的專欄:
深入詳解Elasticsearch
以下是通過ES5.4.0, logstash5.4.1 驗證成功。
可以確認的是2.X版本同樣可以驗證成功。
2、核心配置檔案
input {
stdin {
}
jdbc {
type => "cxx_article_info"
# mysql jdbc connection string to our backup databse 後面的test對應mysql中的test資料庫
jdbc_connection_string => "jdbc:mysql://110.10.15.37:3306/cxxwb"
# the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => "xxxxx"
record_last_run => "true"
use_column_value => "true"
tracking_column => "id"
last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_info"
clean_run => "false"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-5.1.38.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "500"
statement => "select * from cxx_article_info where id > :sql_last_value"
#定時欄位 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新
schedule => "* * * * *"
#設定ES索引型別
}
jdbc {
type => "cxx_user"
# mysql jdbc connection string to our backup databse 後面的test對應mysql中的test資料庫
jdbc_connection_string => "jdbc:mysql://110.10.15.37:3306/cxxwb"
# the user we wish to excute our statement as
jdbc_user => "root"
jdbc_password => "xxxxxx"
record_last_run => "true"
use_column_value => "true"
tracking_column => "id"
last_run_metadata_path => "/opt/logstash/bin/logstash_xxy/cxx_user_info"
clean_run => "false"
# the path to our downloaded jdbc driver
jdbc_driver_library => "/opt/elasticsearch/lib/mysql-connector-java-5.1.38.jar"
# the name of the driver class for mysql
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "500"
statement => "select * from cxx_user_info where id > :sql_last_value"
#以下對應著要執行的sql的絕對路徑。
#statement_filepath => "/opt/logstash/bin/logstash_mysql2es/department.sql"
#定時欄位 各欄位含義(由左至右)分、時、天、月、年,全部為*預設含義為每分鐘都更新
schedule => "* * * * *"
#設定ES索引型別
}
}
filter {
mutate {
convert => [ "publish_time", "string" ]
}
date {
timezone => "Europe/Berlin"
match => ["publish_time" , "ISO8601", "yyyy-MM-dd HH:mm:ss"]
}
#date {
# match => [ "publish_time", "yyyy-MM-dd HH:mm:ss,SSS" ]
# remove_field => [ "publish_time" ]
# }
json {
source => "message"
remove_field => ["message"]
}
}
output {
if [type]=="cxxarticle_info" {
elasticsearch {
#ESIP地址與埠
hosts => "10.100.11.231:9200"
#ES索引名稱(自己定義的)
index => "cxx_info_index"
#自增ID編號
# document_id => "%{id}"
}
}
if [type]=="cxx_user" {
elasticsearch {
#ESIP地址與埠
hosts => "10.100.11.231:9200"
#ES索引名稱(自己定義的)
index => "cxx_user_index"
#自增ID編號
# document_id => "%{id}"
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
3、同步成功結果
[2017-07-19T15:08:05,438][INFO ][logstash.pipeline ] Pipeline main started
The stdin plugin is now waiting for input:
[2017-07-19T15:08:05,491][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2017-07-19T15:09:00,721][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 0) AS `t1` LIMIT 1
[2017-07-19T15:09:00,721][INFO ][logstash.inputs.jdbc ] (0.008000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 0) AS `t1` LIMIT 1
[2017-07-19T15:09:00,730][INFO ][logstash.inputs.jdbc ] (0.004000s) SELECT * FROM (select * from cxx_user_info where id > 0) AS `t1` LIMIT 500 OFFSET 0
[2017-07-19T15:09:00,731][INFO ][logstash.inputs.jdbc ] (0.007000s) SELECT * FROM (select * from cxx_article_info where id > 0) AS `t1` LIMIT 500 OFFSET 0
[2017-07-19T15:10:00,173][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 3) AS `t1` LIMIT 1
[2017-07-19T15:10:00,174][INFO ][logstash.inputs.jdbc ] (0.003000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 2) AS `t1` LIMIT 1
[2017-07-19T15:11:00,225][INFO ][logstash.inputs.jdbc ] (0.001000s) SELECT count(*) AS `count` FROM (select * from cxx_article_info where id > 3) AS `t1` LIMIT 1
[2017-07-19T15:11:00,225][INFO ][logstash.inputs.jdbc ] (0.002000s) SELECT count(*) AS `count` FROM (select * from cxx_user_info where id > 2) AS `t1` LIMIT 1
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
4、擴充套件
1)多個表無非就是在input裡面多加幾個型別,在output中多加基礎
型別判定。
舉例:
if [type]=="cxx_user"
- 1
2)input裡的type和output if判定的type**保持一致**,該type對應ES中的type。
後記
死磕ES,有問題歡迎大家提問探討!
相關推薦
25.logstash一次同步Mysql多張表到ES(ES與關係型資料庫同步)
題記一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。 博友前天線上提問,說明這塊理解的還不夠透徹。 我整理下, 一是為了儘快解決博友問題, 二是加深記憶,便於未來產品開發中快速上手。1、同步原理原有ES專欄中有詳解,不再贅述。詳細請參考我的專欄: 深
mysql 一條sql更新多張表資料,
<update id="deleteActor" parameterType="java.lang.String"> UPDATE actor_tbl a LEFT JOIN video_actor_tbl b ON a.sn = b.actorsn SET a.
如何用VBA將一個工作簿裡多張表的名字與人數彙總在一張表裡
Sub text1()Worksheet.Add.Name="彙總"For Each Sheet In Worksheetsk = k + 1x = y.usedRange.Rows.Countsheets("彙總").Cells(k, 1) = Sheet.Nameshee
21.go-mysql-elasticsearch實現mysql 與elasticsearch實時同步(ES與關係型資料庫同步)
引言:go-mysql-elasticsearch 是國內作者開發的一款外掛。測試表明:該外掛優點:能實現同步增、刪、改、查操作。不足之處(待完善的地方): 1、仍處理開發、相對不穩定階段; 2、沒有日誌,不便於排查問題及檢視同步結果。 本文深入詳解了外掛的安裝、使用、增刪改
22.mysql 與elasticsearch實時同步常用外掛及優缺點對比(ES與關係型資料庫同步)
前言:目前mysql與elasticsearch常用的同步機制大多是基於外掛實現的,常用的外掛包括:elasticsearch-jdbc, elasticsearch-river-MySQL , go-mysql-elasticsearch, logstash-input-j
Logstash從資料庫一次同步多張表
一次同步多張表是開發中的一般需求。之前研究了很久找到方法,但沒有詳細總結。 博友前天線上提問,說明這塊理解的還不夠透徹。 我整理下, 一是為了儘快解決博友問題, 二是加深記憶,便於未來產品開發中快速上手。 1、同步原理 原有ES專欄中有詳解,不再贅述。詳細請參
mysql 從相同類型的多張表中提取到一張表中
tab cursor 同時 sql and ble 有時 eat sch 蝸牛背著沈重的殼,貼著地面一步步艱難地向前爬行,不回頭,也不左顧右盼,只是朝著自己想到達的地方行進。 有時候需要從多張相同類型的表中提取數據,這些表有一些相同的列或者表結構完全相同,同時表名存在一定的
mysql中一條sql語句中使用多個count關聯查詢多張表
在給公司寫程式碼的過程中忽然發現在sql語句中碰到一些小問題,雖然最終自己解決了,但是不得不說好不容易啊,故寫此部落格提醒自己,以免日後再在此處浪費時間。 需求: 有兩張表,分別是marketChan
Mysql 一條update語句,更新多張表(存在關聯的表)
UPDATE table1 LEFT JOIN table2 ON table1.xx=table2.xx (關聯的欄位) SET table1.xx=value,table2.xx=value (update value)WHERE table1.xx=xx
mysql把某一列的資料更新到另一列中(涉及到多張表的資料)
假設有三張表A,B,C現在要把C表中的一列資料更新到A表中,而A和C之間還有張B表。假設要把C表中的id更新到A表中,那麼就可以這樣寫:UPDATE A AINNER JOIN B ON A.xxx = B.xxxINNER JOIN C ON B.xxx= C.xxxSET
MySql 中 一次update更新多條資料
我最近遇到一個匯入Excel表中資料到資料庫的需求,資料量一萬多條。需先查詢資料庫中是否存在要插入的資料,若存在則更新資料,否則插入資料。我首先按照常規思路先查詢,然後插入或更新。然而nginx返回 5
mysql left join 左連接查詢關聯n多張表
part 存在 col 外鍵 ros bold new 多條 ble left join 左連接即以左表為基準,顯示坐標所有的行,右表與左表關聯的數據會顯示,不關聯的則不顯示。關鍵字為left join on。 **基本用法如下: select table a le
hql語句一次比對查詢單表中多個字段
end nta 不能 ase color poll 存儲 pen ike 前端輸入客戶名稱,在使用hql查詢時,要同時比對表中,客戶名稱,客戶簡稱,拼音簡寫,客戶編碼等多個字段 hql寫法 String fdCustomerName=cv.poll("docMain.fdC
記一次線上MySQL數據庫死鎖問題
重復 成功 中一 主鍵 adl 一次 his TE BE 最近線上項目報了一個MySQL死鎖(DealLock)錯誤,雖說對業務上是沒有什麽影響的,由於自己對數據庫鎖這塊了解不是很多,之前也沒怎麽的在線上碰到過。這次剛好遇到了,便在此記錄一下。 出現
mkdir一次創建多個目錄
size sha color 承載 ima 技術分享 系統 term ext 系統管理員必用的十大基礎之一也可以這樣連貫起來一次性創建.... 其中的知識點其實就是花括號{}{};可承載一個以逗號(,)分割的列表,並將其展開為多個列表。。。。有個知識點 展開命令行~USER
如何在 Linux 中一次重新命名多個檔案
你可能已經知道,我們使用 mv 命令在類 Unix 作業系統中重新命名或者移動檔案和目錄。 但是,mv 命令不支援一次重新命名多個檔案。 不用擔心。 在本教程中,我們將學習使用 Linux 中的 mmv 命令一次重新命名多個檔案。 此命令用於在類 Unix 作業系統中使用標準萬用字元批量移動、複製、追加和重新
C#實現類只例項化一次(被多個類訪問呼叫)
C#簡單寫法如下: public class Singleton { private static Singleton _instance = n
踩坑經歷(五)一次關於MySQL儲存過程的排錯
SQL語句 -- #開啟定時器,預設為關閉狀態 set global event_scheduler =1; #或者set GLOBAL event_scheduler = ON; use monitorsys; drop event if exists report_back
mysql-使用儲存過程一次性批量建立多張表
大家好,謝謝大家閱讀我的文章,請多多指教 如何使用儲存過程一次性建立多張表,^-^,程式碼如下: 業務需求:專案中需要建立100張表 我使用了儲存過程來實現 這是我專案中使用到的一個小小功能 DELIMITER $$ USE `DBName`$$ DROP PROCEDURE
一次線上mysql死鎖分析
一、現象 發運車次呼叫發車介面時發生異常,後臺丟擲資料庫死鎖日誌。 二、原因分析 通過日誌可以看出事務T1等待 heap no 8的行鎖 (X locks 排他鎖) 事務T2持有heap no 8的行鎖,等待heap no 7的行鎖 兩個更新運