mysql資料實時同步到Elasticsearch
業務需要把mysql的資料實時同步到ES,實現低延遲的檢索到ES中的資料或者進行其它資料分析處理。本文給出以同步mysql binlog的方式實時同步資料到ES的思路, 實踐並驗證該方式的可行性,以供參考。
mysql binlog日誌
mysql的binlog日誌主要用於資料庫的主從複製與資料恢復。binlog中記錄了資料的增刪改查操作,主從複製過程中,主庫向從庫同步binlog日誌,從庫對binlog日誌中的事件進行重放,從而實現主從同步。
mysql binlog日誌有三種模式,分別為:
ROW: 記錄每一行資料被修改的情況,但是日誌量太大 STATEMENT: 記錄每一條修改資料的SQL語句,減少了日誌量,但是SQL語句使用函式或觸發器時容易出現主從不一致 MIXED: 結合了ROW和STATEMENT的優點,根據具體執行資料操作的SQL語句選擇使用ROW或者STATEMENT記錄日誌
要通過mysql binlog將資料同步到ES叢集,只能使用ROW模式,因為只有ROW模式才能知道mysql中的資料的修改內容。
以UPDATE操作為例,ROW模式的binlog日誌內容示例如下:
SET TIMESTAMP=1527917394/*!*/; BEGIN /*!*/; # at 3751 #180602 13:29:54 server id 1 end_log_pos 3819 CRC32 0x8dabdf01 Table_map: `webservice`.`building` mapped to number 74 # at 3819 #180602 13:29:54 server id 1 end_log_pos 3949 CRC32 0x59a8ed85 Update_rows: table id 74 flags: STMT_END_F BINLOG ' UisSWxMBAAAARAAAAOsOAAAAAEoAAAAAAAEACndlYnNlcnZpY2UACGJ1aWxkaW5nAAYIDwEPEREG wACAAQAAAAHfq40= UisSWx8BAAAAggAAAG0PAAAAAEoAAAAAAAEAAgAG///A1gcAAAAAAAALYnVpbGRpbmctMTAADwB3 UkRNbjNLYlV5d1k3ajVbD64WWw+uFsDWBwAAAAAAAAtidWlsZGluZy0xMAEPAHdSRE1uM0tiVXl3 WTdqNVsPrhZbD64Whe2oWQ== '/*!*/; ### UPDATE `webservice`.`building` ### WHERE ### @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */ ### @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */ ### @3=0 /* TINYINT meta=0 nullable=0 is_null=0 */ ### @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */ ### @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */ ### @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */ ### SET ### @1=2006 /* LONGINT meta=0 nullable=0 is_null=0 */ ### @2='building-10' /* VARSTRING(192) meta=192 nullable=0 is_null=0 */ ### @3=1 /* TINYINT meta=0 nullable=0 is_null=0 */ ### @4='wRDMn3KbUywY7j5' /* VARSTRING(384) meta=384 nullable=0 is_null=0 */ ### @5=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */ ### @6=1527754262 /* TIMESTAMP(0) meta=0 nullable=0 is_null=0 */ # at 3949 #180602 13:29:54 server id 1 end_log_pos 3980 CRC32 0x58226b8f Xid = 182 COMMIT/*!*/;
STATEMENT模式下binlog日誌內容示例為:
SET TIMESTAMP=1527919329/*!*/;
update building set Status=1 where Id=2000
/*!*/;
# at 688
#180602 14:02:09 server id 1 end_log_pos 719 CRC32 0x4c550a7d Xid = 200
COMMIT/*!*/;
從ROW模式和STATEMENT模式下UPDATE操作的日誌內容可以看出,ROW模式完整地記錄了要修改的某行資料更新前的所有欄位的值以及更改後所有欄位的值,而STATEMENT模式只單單記錄了UPDATE操作的SQL語句。我們要將mysql的資料實時同步到ES, 只能選擇ROW模式的binlog, 獲取並解析binlog日誌的資料內容,執行ES document api,將資料同步到ES叢集中。
mysqldump工具
mysqldump是一個對mysql資料庫中的資料進行全量匯出的一個工具.
mysqldump的使用方式如下:
mysqldump -uelastic -p'Elastic_123' --host=172.16.32.5 -F webservice > dump.sql
上述命令表示從遠端資料庫172.16.32.5:3306中匯出database:webservice的所有資料,寫入到dump.sql檔案中,指定-F引數表示在匯出資料後重新生成一個新的binlog日誌檔案以記錄後續的所有資料操作。
dump.sql中的檔案內容如下:
-- MySQL dump 10.13 Distrib 5.6.40, for Linux (x86_64)
--
-- Host: 172.16.32.5 Database: webservice
-- ------------------------------------------------------
-- Server version 5.5.5-10.1.9-MariaDBV1.0R012D002-20171127-1822
/*!40101 SET @[email protected]@CHARACTER_SET_CLIENT */;
/*!40101 SET @[email protected]@CHARACTER_SET_RESULTS */;
/*!40101 SET @[email protected]@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @[email protected]@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @[email protected]@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @[email protected]@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @[email protected]@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @[email protected]@SQL_NOTES, SQL_NOTES=0 */;
--
-- Table structure for table `building`
--
DROP TABLE IF EXISTS `building`;
/*!40101 SET @saved_cs_client = @@character_set_client */;
/*!40101 SET character_set_client = utf8 */;
CREATE TABLE `building` (
`Id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
`BuildingId` varchar(64) NOT NULL COMMENT '虛擬建築Id',
`Status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '虛擬建築狀態:0、處理中;1、正常;-1,停止;-2,銷燬中;-3,已銷燬',
`BuildingName` varchar(128) NOT NULL DEFAULT '' COMMENT '虛擬建築名稱',
`CreateTime` timestamp NOT NULL DEFAULT '2017-12-03 16:00:00' COMMENT '建立時間',
`UpdateTime` timestamp NOT NULL DEFAULT '2017-12-03 16:00:00' COMMENT '更新時間',
PRIMARY KEY (`Id`),
UNIQUE KEY `BuildingId` (`BuildingId`)
) ENGINE=InnoDB AUTO_INCREMENT=2010 DEFAULT CHARSET=utf8 COMMENT='虛擬建築表';
/*!40101 SET character_set_client = @saved_cs_client */;
--
-- Dumping data for table `building`
--
LOCK TABLES `building` WRITE;
/*!40000 ALTER TABLE `building` DISABLE KEYS */;
INSERT INTO `building` VALUES (2000,'building-2',0,'6YFcmntKrNBIeTA','2018-05-30 13:28:31','2018-05-30 13:28:31'),(2001,'building-4',0,'4rY8PcVUZB1vtrL','2018-05-30 13:28:34','2018-05-30 13:28:34'),(2002,'building-5',0,'uyjHVUYrg9KeGqi','2018-05-30 13:28:37','2018-05-30 13:28:37'),(2003,'building-7',0,'DNhyEBO4XEkXpgW','2018-05-30 13:28:40','2018-05-30 13:28:40'),(2004,'building-1',0,'TmtYX6ZC0RNB4Re','2018-05-30 13:28:43','2018-05-30 13:28:43'),(2005,'building-6',0,'t8YQcjeXefWpcyU','2018-05-30 13:28:49','2018-05-30 13:28:49'),(2006,'building-10',0,'WozgBc2IchNyKyE','2018-05-30 13:28:55','2018-05-30 13:28:55'),(2007,'building-3',0,'yJk27cmLOVQLHf1','2018-05-30 13:28:58','2018-05-30 13:28:58'),(2008,'building-9',0,'RSbjotAh8tymfxs','2018-05-30 13:29:04','2018-05-30 13:29:04'),(2009,'building-8',0,'IBOMlhaXV6k226m','2018-05-30 13:29:31','2018-05-30 13:29:31');
/*!40000 ALTER TABLE `building` ENABLE KEYS */;
UNLOCK TABLES;
/*!40103 SET [email protected]_TIME_ZONE */;
/*!40101 SET [email protected]_SQL_MODE */;
/*!40014 SET [email protected]_FOREIGN_KEY_CHECKS */;
/*!40014 SET [email protected]_UNIQUE_CHECKS */;
/*!40101 SET [email protected]_CHARACTER_SET_CLIENT */;
/*!40101 SET [email protected]_CHARACTER_SET_RESULTS */;
/*!40101 SET [email protected]_COLLATION_CONNECTION */;
/*!40111 SET [email protected]_SQL_NOTES */;
-- Dump completed on 2018-06-02 14:23:51
從以上內容可以看出,mysqldump匯出的sql檔案包含create table, drop table以及插入資料的sql語句,但是不包含create database建庫語句。
使用go-mysql-elasticsearch開源工具同步資料到ES
go-mysql-elasticsearch是用於同步mysql資料到ES叢集的一個開源工具,專案github地址:https://github.com/siddontang/go-mysql-elasticsearch
go-mysql-elasticsearch的基本原理是:如果是第一次啟動該程式,首先使用mysqldump工具對源mysql資料庫進行一次全量同步,通過elasticsearch client執行操作寫入資料到ES;然後實現了一個mysql client,作為slave連線到源mysql,源mysql作為master會將所有資料的更新操作通過binlog event同步給slave, 通過解析binlog event就可以獲取到資料的更新內容,之後寫入到ES.
另外,該工具還提供了操作統計的功能,每當有資料增刪改操作時,會將對應操作的計數加1,程式啟動時會開啟一個http服務,通過呼叫http介面可以檢視增刪改操作的次數。
使用限制:
1. mysql binlog必須是ROW模式
2. 要同步的mysql資料表必須包含主鍵,否則直接忽略,這是因為如果資料表沒有主鍵,UPDATE和DELETE操作就會因為在ES中找不到對應的document而無法進行同步
3. 不支援程式執行過程中修改表結構
4. 要賦予用於連線mysql的賬戶RELOAD許可權以及REPLICATION許可權, SUPER許可權:
GRANT REPLICATION SLAVE ON *.* TO 'elastic'@'172.16.32.44';
GRANT RELOAD ON *.* TO 'elastic'@'172.16.32.44';
UPDATE mysql.user SET Super_Priv='Y' WHERE user='elastic' AND host='172.16.32.44';
使用方式:
- git clone https://github.com/siddontang/go-mysql-elasticsearch
- cd go-mysql-elasticsearch/src/github.com/siddontang/go-mysql-elasticsearch
- vi etc/river.toml, 修改配置檔案,同步172.16.0.101:3306資料庫中的webservice.building表到ES叢集172.16.32.64:9200的building index(更詳細的配置檔案說明可以參考專案文件)
# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "172.16.0.101:3306"
my_user = "bellen"
my_pass = "Elastic_123"
my_charset = "utf8"
# Set true when elasticsearch use https
#es_https = false
# Elasticsearch address
es_addr = "172.16.32.64:9200"
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
es_user = ""
es_pass = ""
# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing.
# TODO: support other storage, like etcd.
data_dir = "./var"
# Inner Http status address
stat_addr = "127.0.0.1:12800"
# pseudo server id like a slave
server_id = 1001
# mysql or mariadb
flavor = "mariadb"
# mysqldump execution path
# if not set or empty, ignore mysqldump.
mysqldump = "mysqldump"
# if we have no privilege to use mysqldump with --master-data,
# we must skip it.
#skip_master_data = false
# minimal items to be inserted in one bulk
bulk_size = 128
# force flush the pending requests if we don't have enough items >= bulk_size
flush_bulk_time = "200ms"
# Ignore table without primary key
skip_no_pk_table = false
# MySQL data source
[[source]]
schema = "webservice"
tables = ["building"]
[[rule]]
schema = "webservice"
table = "building"
index = "building"
type = "buildingtype"
-
在ES叢集中建立building index, 因為該工具並沒有使用ES的auto create index功能,如果index不存在會報錯
-
執行命令:./bin/go-mysql-elasticsearch -config=./etc/river.toml
-
控制檯輸出結果:
2018/06/02 16:13:21 INFO create BinlogSyncer with config {1001 mariadb 172.16.0.101 3306 bellen utf8 false false <nil> false false 0 0s 0s 0}
2018/06/02 16:13:21 INFO run status http server 127.0.0.1:12800
2018/06/02 16:13:21 INFO skip dump, use last binlog replication pos (mysql-bin.000001, 120) or GTID %!s(<nil>)
2018/06/02 16:13:21 INFO begin to sync binlog from position (mysql-bin.000001, 120)
2018/06/02 16:13:21 INFO register slave for master server 172.16.0.101:3306
2018/06/02 16:13:21 INFO start sync binlog at binlog file (mysql-bin.000001, 120)
2018/06/02 16:13:21 INFO rotate to (mysql-bin.000001, 120)
2018/06/02 16:13:21 INFO rotate binlog to (mysql-bin.000001, 120)
2018/06/02 16:13:21 INFO save position (mysql-bin.000001, 120)
- 測試:向mysql中插入、修改、刪除資料,都可以反映到ES中
使用體驗
- go-mysql-elasticsearch完成了最基本的mysql實時同步資料到ES的功能,業務如果需要更深層次的功能如允許執行中修改mysql表結構,可以進行自行定製化開發。
- 異常處理不足,解析binlog event失敗直接丟擲異常
- 據作者描述,該專案並沒有被其應用於生產環境中,所以使用過程中建議通讀原始碼,知其利弊。
使用mypipe同步資料到ES叢集
mypipe是一個mysql binlog同步工具,在設計之初是為了能夠將binlog event傳送到kafka, 當前版本可根據業務的需要也可以自定以將資料同步到任意的儲存介質,專案github地址 https://github.com/mardambey/mypipe.
使用限制
1. mysql binlog必須是ROW模式
2. 要賦予用於連線mysql的賬戶REPLICATION許可權
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'elastic'@'%' IDENTIFIED BY 'Elastic_123'
3. mypipe只是將binlog日誌內容解析後編碼成Avro格式推送到kafka broker, 並不是將資料推送到kafka,如果需要同步到ES叢集,可以從kafka消費資料後,再寫入ES
4. 消費kafka中的訊息(mysql insert, update, delete操作及具體的資料),需要對訊息內容進行Avro解析,獲取到對應的資料操作內容,進行下一步處理;mypipe封裝了一個KafkaGenericMutationAvroConsumer類,可以直接繼承該類使用,或者自行解析
5. mypipe只支援binlog同步,不支援存量資料同步,也即mypipe程式啟動後無法對mysql中已經存在的資料進行同步
使用方式
- git clone https://github.com/mardambey/mypipe.git
- ./sbt package
- 配置mypipe-runner/src/main/resources/application.conf
mypipe {
# Avro schema repository client class name
schema-repo-client = "mypipe.avro.schema.SchemaRepo"
# consumers represent sources for mysql binary logs
consumers {
localhost {
# database "host:port:user:pass" array
source = "172.16.0.101:3306:elastic:Elastic_123"
}
}
# data producers export data out (stdout, other stores, external services, etc.)
producers {
kafka-generic {
class = "mypipe.kafka.producer.KafkaMutationGenericAvroProducer"
}
}
# pipes join consumers and producers
pipes {
kafka-generic {
enabled = true
consumers = ["localhost"]
producer {
kafka-generic {
metadata-brokers = "172.16.16.22:9092"
}
}
binlog-position-repo {
# saved to a file, this is the default if unspecified
class = "mypipe.api.repo.ConfigurableFileBasedBinaryLogPositionRepository"
config {
file-prefix = "stdout-00" # required if binlog-position-repo is specifiec
data-dir = "/tmp/mypipe/data" # defaults to mypipe.data-dir if not present
}
}
}
}
}
- 配置mypipe-api/src/main/resources/reference.conf,修改include-event-condition選項,指定需要同步的database及table
include-event-condition = """ db == "webservice" && table =="building" """
-
在kafka broker端建立topic: webservice_building_generic, 預設情況下mypipe以"${db}_${table}_generic"為topic名,向該topic傳送資料
-
執行:./sbt "project runner" "runMain mypipe.runner.PipeRunner"
-
測試:向mysql building表中插入資料,寫一個簡單的consumer消費mypipe推送到kafka中的訊息
-
消費到沒有經過解析的資料如下:
ConsumerRecord(topic=u'webservice_building_generic', partition=0, offset=2, timestamp=None, timestamp_type=None, key=None,
value='\x00\x01\x00\x00\x14webservice\x10building\xcc\x01\x02\x91,\xae\xa3fc\x11\xe8\xa1\xaaRT\x00Z\xf9\xab\x00\x00\x04\x18BuildingName\x06xxx\x14BuildingId\nId-10\x00\x02\x04Id\xd4%\x00',
checksum=128384379, serialized_key_size=-1, serialized_value_size=88)
使用體驗
- mypipe相比go-mysql-elasticsearch更成熟,支援執行時ALTER TABLE,同時解析binlog異常發生時,可通過配置不同的策略處理異常
- mypipe不能同步存量資料,如果需要同步存量資料可通過其它方式先全量同步後,再使用mypipe進行增量同步
- mypipe只同步binlog, 需要同步資料到ES需要另行開發
作者:bellengao
連結:https://www.jianshu.com/p/c3faa26bc221
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。