1.前言
在我們系統開發過程中,根據業務場景很多資料庫資料並不會直接給使用者訪問的,需要同步儲存到ElasticSearch、Redis等儲存應用當中(例如最常見的是搜尋頁面的ElasticSearch資料)。而阿里開源的框架Canal就是做這方面的功能,它可以把資料庫(暫時只支援MySQL和Oracle部分版本)日誌解析獲取增量變更同步到其他儲存應用去。
2.什麼是Canal?
官網介紹,canal [kə'næl],譯意為水道/管道/溝渠,主要用途是基於MySQL資料庫增量日誌解析,提供增量資料訂閱和消費。
從上述介紹我們可以簡單認為Canal就是一個簡單的增量資料同步工具。
2.1MySQL主備複製原理
根據官網介紹,MySQL主備複製原理如下:
●MySQL master(主庫)將資料變更(增刪改)寫入二進位制日誌(binary log,其中記錄叫做二進位制日誌事件binary log events,可以通過show binlog events進行檢視)。
●MySQL slave(從庫)將master的binary log events 拷貝到它的中繼日誌(relay log)。
●MySQL slave(從庫)重放relay log中事件,將資料變更反映它自己的資料。
2.2Canal工作原理
根據官網介紹,Canal工作原理如下圖所示:
●canal模擬MySQL slave的互動協議,偽裝自己為MySQL slave,向MySQL master傳送dump協議。
●MySQL master收到dump請求,開始推送binary log給slave(即canal )。
●canal解析binary log物件(原始為byte流),再推送到MySQL、kafka、ElasticSearch等儲存應用當中。
3.Canal能做什麼?
早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務trigger(觸發器)獲取增量變更。從2010年開始,業務逐步嘗試資料庫日誌解析獲取增量變更進行同步,由此衍生出了大量的資料庫增量訂閱和消費業務。所以Canal就是在這個場景中誕生的,它主要作用就是解決基於日誌增量訂閱和消費的業務,例如:
●資料庫映象。
●資料庫實時備份。
●索引構建和實時維護(拆分異構索引、倒排索引等)。
●業務快取重新整理。
●帶業務邏輯的增量資料處理(例如ElasticSearch、Redis資料同步)。
在我做過的專案中,cancal經常被用到如下場景:
●根據資料庫的資料變更實時更新搜尋引擎資料,比如我司電商場景下物料資料發生變更(例如後臺上傳更新物料資訊、價格),實時同步到搜尋引擎Elasticsearch上。
●根據資料庫的資料變更實時更新快取,比如專門運營人員每次修改物料品牌資訊同時都會同步到Redis上。
●根據資料庫的資料變更實時推送到訊息佇列,比如為了豐富自身系統物料庫存,定時作業拉取第三方渠道物料庫存推送到RabbitMQ等訊息佇列去消費入庫。
4.如何搭建Canal?
4.1首先得安裝個MySQL資料庫
如果已經安裝好MySQL資料庫的,這一步可以跳過,如果沒有安裝好,請自行安裝(也可以檢視我之前寫過一篇MySQL安裝教程,不過個人建議最好還是在Docker上安裝,簡單方便快捷,如果自己手動安裝,不懂點運維基礎知識,坑太多了),具體安裝教程度娘一堆資料。當前的canal支援MySQL版本包括5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
我的MySQL版本是8.0.23,所以canal是支援的。不知道自己安裝是什麼版本可以通過SELECT VERSION();命令檢視。
4.2資料庫配置
從上述可知,因為canal是模擬MySQL slave的互動協議,偽裝自己為MySQL slave,向MySQL master傳送dump協議獲取binary log內容物件的,所以需要MySql開啟binlog。
●修改mysql.cnf中的配置:
-- 編輯mysql.cnf檔案
vim /etc/my.cnf;
-- 在my.cnf上加入如下配置
[mysqld]
log-bin=mysql-bin #開啟binlog
binlog-format=ROW #選擇ROW模式
server_id=1 #配置MySQL replaction需要定義,不要和canal的slaveId重複
expire-logs-days=10 #binlog日誌保留的天數,清除超過10天的日誌,防止日誌檔案過大,導致磁碟空間不足
●授權canal連結MySQL賬號具有作為MySQL slave的許可權, 如果已有賬戶可直接grant(我這邊是根據官網示例建立一個canal賬號來演示):
-- 先登入MySQL
mysql -u root -p
-- 建立使用者,使用者名稱:canal,密碼:qwer1234
CREATE USER canal IDENTIFIED BY 'qwer1234';
-- 授予上的所有許可權給canal使用者;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 重新整理許可權;
FLUSH PRIVILEGES;
●檢視下MySql是否開啟binlog日誌
是否開啟binlog日誌:
SHOW VARIABLES LIKE 'log_bin';
檢視binlog日誌檔案列表:
SHOW BINARY LOGS;
檢視當前正在寫入的binlog檔案:
SHOW MASTER STATUS;
4.3Canal配置
安裝執行Canal服務端,一定要記得先檢查當前Linux系統是否安裝了java8環境,如果沒有安裝啟動Canal時候會有如下提示:
[root@dengwu canal]# sh bin/startup.sh
which: no java in (/data/mysql/bin:/data/mysql/lib:/usr/lib64/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin)
Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH.
具體安裝步驟如下:
●先從Oracle官網下載JDK安裝包:
通過Xftp工具匯入到預先建立/app/package安裝包目錄下,再在/usr目錄下建立java目錄並解壓:
mkdir /usr/java
cd /app/package
tar zxvf jdk-8u291-linux-x64.tar.gz -C /usr/java
然後配置java環境變數:
vim /etc/profile
用vim編輯器來編輯profile檔案,輸入i在檔案末尾新增以下內容:
export JAVA_HOME=/usr/java/jdk1.8.0_291
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
export PATH=$PATH:${JAVA_PATH}
配置完java環境變數後,:wq儲存退出,看看是否生效:
echo $PATH
如果沒有生效,讓其生效:
source /etc/profile
再瞄瞄java8是否安裝成功:
java -version
●然後下載canal, 訪問release頁面, 選擇需要的包下載, 如最新版本1.1.5為例:
可以使用wget工具下載:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
或者手動下載,通過Xftp等工具拉入安裝包目錄(/app/package)中:
再建立canal安裝目錄解壓安裝包:
mkdir /app/canal
cd /app/package
tar zxvf canal.deployer-1.1.5.tar.gz -C /app/canal
然後修改配置:
cd /app/canal
vi conf/example/instance.properties
i
:wq
●啟動canal:
cd /app/canal
sh bin/startup.sh
注:Windows使用startup.bat啟動
●檢視canal程序是否啟動成功
ps -ef | grep canal
●檢視instance的日誌:
vi logs/example/example.log
●關閉canal:
sh bin/stop.sh
●在資料庫中檢視從庫資訊:
SHOW SLAVE HOSTS;
檢視下canal例項(example)配置是否成功。
●記得把canal埠加入防火牆策略去:
-- 允許通過防火牆
firewall-cmd --permanent --zone=public --add-port=11111/tcp
-- 從防火牆裡移除
firewall-cmd --permanent --zone=public --remove-port=11111/tcp
-- 檢視埠在防火牆狀態
firewall-cmd --permanent --zone=public --query-port=11111/tcp
-- 重啟防火牆
firewall-cmd --reload
注:如果是買阿里雲伺服器,要到阿里雲安全組新增允許通過策略。還有Canal Server的預設埠為:11111,若需要修改,可以去/canal/conf目錄下的canal.properties配置檔案中進行修改。
5.Canal的.NET客戶端CanalSharp使用
5.1快速入門
●先安裝客戶端:
Install-Package CanalSharp
●初始化日誌:
CanalSharp使用Microsoft.Extensions.Logging.Abstractions,因為目前主流日誌元件,如:nlog、serilog等,全部支援此日誌抽象接入,也就是說你可以通過安裝nlog、serilog對其的適配,來使用它們,無論是Console App或則是Web App。
var loggerFactory = LoggerFactory.Create(builder =>
{
builder
.AddFilter("Microsoft", LogLevel.Debug)
.AddFilter("System", LogLevel.Information)
.AddConsole();
});
var logger= loggerFactory.CreateLogger<SimpleCanalConnection>();
●建立連線:
var conn=new SimpleCanalConnection(new SimpleCanalOptions("127.0.0.1",11111,1234),logger);
await conn.ConnectAsync();//連線到Canal Server
await conn.SubscribeAsync();//訂閱
●獲取資料:
var msg = await conn.GetAsync(1024);
5.2進階使用
●解析資料
○Entry
上文conn.GetAsync()返回的是一個Entry集合,Entry對應binlog記錄,它可能是事務標記也有可能是行資料變化,通過Entry.EntryType來區分,一般事務的標記在業務消費處理時不需要處理。
示例:
var entries = await conn.GetAsync(1024);
foreach (var entry in entries)
{
//不處理事務標記
if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
{
continue;
}
}
Entry.Header包含了一些binlog以及資料庫資訊
屬性 |
說明 |
Entry.Header.LogfileName |
binlog檔名 |
Entry.Header.LogfileOffset |
binlog偏移 |
Entry.Header.SchemaName |
mysql schema名稱 |
Entry.Header.TableName |
表名 |
○RowChange
一般在業務處理中,都會需要行資料的變更,將Entry轉換為RowChange物件
示例:
RowChange rowChange = null;
try
{
rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
}
catch (Exception e)
{
_logger.LogError(e);
}
通過RowChange.EventType來Row是什麼變化,Update、Delete和Insert對應sql中的update、delete和insert語句,通過RowChange.RowDatas屬性,來訪問RowChange物件中包含的行變化資料集合。示例,遍歷 RowChange 中的行資料:
foreach (var rowData in rowChange.RowDatas)
{
//刪除的資料
if (eventType == EventType.Delete)
{
PrintColumn(rowData.BeforeColumns.ToList());
}
//插入的資料
else if (eventType == EventType.Insert)
{
PrintColumn(rowData.AfterColumns.ToList());
}
//更新的資料
else
{
_logger.LogInformation("-------> before");
PrintColumn(rowData.BeforeColumns.ToList());
_logger.LogInformation("-------> after");
PrintColumn(rowData.AfterColumns.ToList());
}
} private static void PrintColumn(List<Column> columns)
{
foreach (var column in columns)
{
Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}");
}
}
○Column
Column如其名,代表資料庫中表的每一列的資訊。
屬性名 |
說明 |
Column.Name |
列名 |
Column.Value |
列的值 |
Column.Updated |
列是否被更新 |
5.3應答機制
應答機制可以保證消費資料的準確性,Canal服務端會記錄Client消費的進度,需要客戶端傳送ACK訊息,服務端才會更新進度。類似於在訊息佇列中的ACK機制,如RabbitMQ。
●自動應答
await conn.GetAsync(1024);//獲取資料並自動應答
GetAsync()會在獲取資料後,自動向Server傳送ack訊息。
●手動應答
var msg = await conn.GetWithoutAckAsync(1024);//獲取資料
await conn.AckAsync(msg.Id);//手動應答
await conn.RollbackAsync(msg.Id);//回滾
5.4高可用
這裡的高可用分為兩類,客戶端叢集和服務端叢集。都是採用冷備模式,因為對於binlog資料消費來說,並行處理將會帶來資料順序錯亂的問題,當然你可以通過一些複雜的機制去實現,這裡不做說明。叢集部署需要Zookeeper元件。
●服務端叢集
在conf/canal.properties檔案中修改zookeeper地址:
canal.zkServers=127.0.0.1:2181
叢集中每個例項需,配置相同的zookeeper地址。
●客戶端叢集
客戶端叢集和服務端叢集採用相同的模式,每個例項去搶佔鎖,獲得了鎖那麼這個例項就執行獲取資料,其他例項做冷備。若正在執行消費資料的例項由於網路波動,導致和zookeeper失去連線,那麼其他客戶端例項不會立即搶佔,會等待60s後才執行搶佔,給與這個例項恢復的機會。
客戶端叢集使用的連線物件和快速入門中的不同:ClusterCanalConnection,但使用方法基本相同。
示例:
//初始化日誌
var loggerFactory = LoggerFactory.Create(builder =>
{
builder
.AddFilter("Microsoft", LogLevel.Debug)
.AddFilter("System", LogLevel.Information)
.AddConsole();
});
var logger = loggerFactory.CreateLogger<Program>();
//設定zk地址和clientid,統一叢集的client必須相同
var conn = new ClusterCanalConnection( new ClusterCanalOptions("localhost:2181", "12350")
//連線到Server loggerFactory);
await conn.ConnectAsync();
//訂閱
await conn.SubscribeAsync();
await conn.RollbackAsync(0);
while (true)
{
try
{
//獲取資料
var msg = await conn.GetAsync(1024);
}
catch (Exception e)
{
_logger.LogError(e,"Error.");
//發生異常執行重連,此方法只有叢集連線物件才有
await conn.ReConnectAsync();
}
}
5.5訂閱
訂閱指過濾表(table)的規則,Canal客戶端傳送給客戶端訂閱規則,那麼服務端將會推送符合規則的表資料過來,採用正則匹配。
允許所有表:.\*\\\\..\*
6.小結
這裡這是簡單介紹Canal工作原理,能做什麼,還有.NET客戶端CanalSharp使用,其實Canal涉及知識點還是很多的,例如配置MQ模式、服務叢集、Web管理介面部署,多例項等等。後面如果有時間,我還會繼續花時間去學習。
參考文獻:
CanalSharp文件
CanalSharp