阿里Canal框架資料庫同步-實戰教程
一、Canal簡介:
canal是阿里巴巴旗下的一款開源專案,純Java開發。基於資料庫增量日誌解析,提供增量資料訂閱&消費,目前主要支援了MySQL(也支援mariaDB)。
二、背景介紹:
早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。
三、適用場景:
在一些複雜的業務邏輯中,可能插入或者查詢資料都比較頻繁,如果一直在資料庫插入查詢會造成速度非常慢,可以把資料庫表分成兩個庫,一個庫用來做查詢,一個庫作為插入資料,讀寫分離,怎麼解決呢?就可以用canal框架來監聽資料是否發生改變,來同步資料。
比如大部分人都做搜尋引擎ES,咱們不可能每次資料庫更新了資料手動去同步索引庫,咱們就可以用Canal來監聽資料庫增刪改時去重新匯入索引庫,保持資料一致性。
四、Canal的工作機制
複製過程分成三步:
(1) Master主庫將改變記錄,寫到二進位制日誌(binary log)中(這些記錄叫做二進位制日誌事件,binary log events,可以通過show binlog events進行檢視);
(2) Slave從庫向mysql master傳送dump協議,將master主庫的binary log events拷貝到它的中繼日誌(relay log);
(3) Slave從庫讀取並重做中繼日誌中的事件,將改變的資料同步到自己的資料庫。
四、Canal中介軟體功能
基於純java語言開發,可以用於做增量資料訂閱和消費功能。
相比於傳統的資料同步,我們通常需要進行先搭建主從架構,然後使用binlog日誌進行讀取,然後指定需要同步的資料庫,資料庫表等資訊。但是隨著我們業務的不斷複雜,這種傳統的資料同步方式以及開始變得較為繁瑣,不夠靈活。
canal模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master傳送dump協議mysql master收到dump請求,開始推送binary log給slave(也就是canal),canal解析binary log物件(原始為byte流),通過對binlog資料進行解析即可獲取需要同步的資料,在進行同步資料的過程中還可以加入開發人員的一些額外邏輯處理,比較開放。
Binlog的三種基本型別分別為:
STATEMENT模式只記錄了sql語句,但是沒有記錄上下文資訊,在進行資料恢復的時候可能會導致資料的丟失情況
ROW模式除了記錄sql語句之外,還會記錄每個欄位的變化情況,能夠清楚的記錄每行資料的變化歷史,但是會佔用較多的空間,需要使用mysqlbinlog工具進行檢視。
MIX模式比較靈活的記錄,例如說當遇到了表結構變更的時候,就會記錄為statement模式。當遇到了資料更新或者刪除情況下就會變為row模式
五、安裝Canal
1.準備工作:win10系統、jdk1.8、mysql5.7、canal1.1.1
2.連線自己的資料,檢查binlog功能是否有開啟,檢查命令:show variables like 'log_bin';
3.如果顯示狀態為OFF表示該功能未開啟,就需要找到自己安裝的Mysql位置找到my.ini檔案,在此檔案的最下面一行加上如下(注意:儲存檔案後重啟下自己的Mysql資料庫):
1 server-id=1 #不能與canal的slaveId重複即可 2 log-bin=mysql-bin 3 binlog_format = ROW #設定ROW模式
4.再次檢視binlog功能是否有開啟,檢查命令:show variables like 'log_bin';
5.我們需要建立一個使用者操作資料庫的寫入操作,我們需要給使用者許可權,執行如下sql語句:
1 CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; 2 GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 3 FLUSH PRIVILEGES;
6.下載我canal客戶端,官方地址進行相應版本的安裝包進行下載(注意:如果下載翻到本文最下面聯絡我): https://github.com/alibaba/canal/releases
7.下載成功後,解壓壓縮包後進入conf下面的example目錄下面的instance.properties檔案開啟編輯如下地方:
8.返回bin目錄點選startup.bat啟動canal服務端,如下圖表示啟動成功:
六、java程式碼實現
1.新建一個maven專案,匯入maven jar包如下:
1 <dependency> 2 <groupId>com.alibaba.otter</groupId> 3 <artifactId>canal.client</artifactId> 4 <version>1.1.0</version> 5 </dependency>
2.編寫測試程式碼
1 package com.fuzongle.canal.conf; 2 3 import com.alibaba.otter.canal.client.CanalConnector; 4 import com.alibaba.otter.canal.client.CanalConnectors; 5 import com.alibaba.otter.canal.protocol.CanalEntry; 6 import com.alibaba.otter.canal.protocol.CanalEntry.Column; 7 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; 8 import com.alibaba.otter.canal.protocol.CanalEntry.EventType; 9 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; 10 import com.alibaba.otter.canal.protocol.Message; 11 import com.google.protobuf.InvalidProtocolBufferException; 12 13 import java.net.InetSocketAddress; 14 import java.util.List; 15 import java.util.Queue; 16 import java.util.concurrent.ConcurrentLinkedQueue; 17 /** 18 * @Auther: fzl 19 * @Date: 2020/4/20 01:21 20 * @Description: 21 */ 22 public class TestCanal { 23 24 private static Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>(); 25 26 public static void main(String[] args) { 27 //獲取canalServer連線:本機地址,埠號 28 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 29 11111), "example", "", ""); 30 int batchSize = 1000; 31 try { 32 //連線canalServer 33 connector.connect(); 34 //訂閱Desctinstion 35 connector.subscribe(); 36 connector.rollback(); 37 try { 38 while (true) { 39 //嘗試從master那邊拉去資料batchSize條記錄,有多少取多少 40 //輪詢拉取資料 上面的where 41 Message message = connector.getWithoutAck(batchSize); 42 long batchId = message.getId(); 43 int size = message.getEntries().size(); 44 if (batchId == -1 || size == 0) { 45 //睡眠 46 Thread.sleep(1000); 47 } else { 48 dataHandle(message.getEntries()); 49 } 50 connector.ack(batchId); 51 System.out.println("aa"+size); 52 //當佇列裡面堆積的sql大於一定數值的時候就模擬執行 53 if (SQL_QUEUE.size() >= 10) { 54 executeQueueSql(); 55 } 56 } 57 } catch (InterruptedException e) { 58 e.printStackTrace(); 59 } catch (InvalidProtocolBufferException e) { 60 e.printStackTrace(); 61 } 62 } finally { 63 connector.disconnect(); 64 } 65 66 67 } 68 69 70 71 72 /** 73 * 模擬執行佇列裡面的sql語句 74 */ 75 public static void executeQueueSql() { 76 int size = SQL_QUEUE.size(); 77 for (int i = 0; i < size; i++) { 78 String sql = SQL_QUEUE.poll(); 79 System.out.println("[sql]----> " + sql); 80 } 81 } 82 83 /** 84 * 資料處理 85 * 86 * @param entrys 87 */ 88 private static void dataHandle(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException { 89 for (CanalEntry.Entry entry : entrys) { 90 if (EntryType.ROWDATA == entry.getEntryType()) { 91 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); 92 CanalEntry.EventType eventType = rowChange.getEventType(); 93 if (eventType == EventType.DELETE) { 94 saveDeleteSql(entry); 95 } else if (eventType == EventType.UPDATE) { 96 saveUpdateSql(entry); 97 } else if (eventType == CanalEntry.EventType.INSERT) { 98 saveInsertSql(entry); 99 } 100 } 101 } 102 } 103 104 /** 105 * 儲存更新語句 106 * 107 * @param entry 108 */ 109 private static void saveUpdateSql(CanalEntry.Entry entry) { 110 try { 111 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); 112 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); 113 for (CanalEntry.RowData rowData : rowDatasList) { 114 List<Column> newColumnList = rowData.getAfterColumnsList(); 115 StringBuffer sql = new StringBuffer("update " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " set "); 116 for (int i = 0; i < newColumnList.size(); i++) { 117 sql.append(" " + newColumnList.get(i).getName() 118 + " = '" + newColumnList.get(i).getValue() + "'"); 119 if (i != newColumnList.size() - 1) { 120 sql.append(","); 121 } 122 } 123 sql.append(" where "); 124 List<Column> oldColumnList = rowData.getBeforeColumnsList(); 125 for (Column column : oldColumnList) { 126 if (column.getIsKey()) { 127 //暫時只支援單一主鍵 128 sql.append(column.getName() + "=" + column.getValue()); 129 break; 130 } 131 } 132 SQL_QUEUE.add(sql.toString()); 133 } 134 } catch (InvalidProtocolBufferException e) { 135 e.printStackTrace(); 136 } 137 } 138 139 /** 140 * 儲存刪除語句 141 * 142 * @param entry 143 */ 144 private static void saveDeleteSql(CanalEntry.Entry entry) { 145 try { 146 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); 147 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); 148 for (CanalEntry.RowData rowData : rowDatasList) { 149 List<Column> columnList = rowData.getBeforeColumnsList(); 150 StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " where "); 151 for (Column column : columnList) { 152 if (column.getIsKey()) { 153 //暫時只支援單一主鍵 154 sql.append(column.getName() + "=" + column.getValue()); 155 break; 156 } 157 } 158 SQL_QUEUE.add(sql.toString()); 159 } 160 } catch (InvalidProtocolBufferException e) { 161 e.printStackTrace(); 162 } 163 } 164 165 /** 166 * 儲存插入語句 167 * 168 * @param entry 169 */ 170 private static void saveInsertSql(CanalEntry.Entry entry) { 171 try { 172 RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); 173 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); 174 for (CanalEntry.RowData rowData : rowDatasList) { 175 List<Column> columnList = rowData.getAfterColumnsList(); 176 StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName() + " ("); 177 for (int i = 0; i < columnList.size(); i++) { 178 sql.append(columnList.get(i).getName()); 179 if (i != columnList.size() - 1) { 180 sql.append(","); 181 } 182 } 183 sql.append(") VALUES ("); 184 for (int i = 0; i < columnList.size(); i++) { 185 sql.append("'" + columnList.get(i).getValue() + "'"); 186 if (i != columnList.size() - 1) { 187 sql.append(","); 188 } 189 } 190 sql.append(")"); 191 SQL_QUEUE.add(sql.toString()); 192 } 193 } catch (InvalidProtocolBufferException e) { 194 e.printStackTrace(); 195 } 196 } 197 }
3.如果資料庫值發生改變之後會觸發增刪改,咱們可以拿到這個資料插入到其他資料庫中。
注意:
1.如果有任何不懂的地方可以諮詢我,隨時歡迎互相幫助。
2.以上完整程式碼加群(群檔案):422167709。
3.如果希望學習更多,感謝您關注公眾號 "程式設計小樂",回覆canal領取完整程式碼。