1. 程式人生 > >搭建:canal部署與例項執行和解析MysqlBinlog日誌傳送到Kafka中

搭建:canal部署與例項執行和解析MysqlBinlog日誌傳送到Kafka中

1、準備:

github:https://github.com/alibaba/canal

裡面有包括canal的文件,server端 client端的 例子 原始碼包等等。

2、canal概述:

canal是應阿里巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業務需求而提出的。

早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的資料庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。ps. 目前內部使用的同步,已經支援mysql5.x和

Oracle部分版本的日誌解析


基於日誌增量訂閱&消費支援的業務:


資料庫映象
資料庫實時備份
多級索引 (賣家和買家各自分庫索引)
search build
業務cache重新整理
價格變化等重要業務訊息

keyword:資料庫同步,增量訂閱&消費。

3、canal工作原理:



從上層來看,複製分成三步:


master將改變記錄到二進位制日誌(binary log)中(這些記錄叫做二進位制日誌事件,binary log events,可以通過show binlog events進行檢視);
slave將master的binary log events拷貝到它的中繼日誌(relay log);

slave重做中繼日誌中的事件,將改變反映它自己的資料。

4、部署canal:

部署canal-server:

(1)開啟MySQL的binlog功能,並配置binlog模式為row。

在my.cnf 加入如下:

  1. [mysqld]  
  2. log-bin=mysql-bin #新增這一行就ok  
  3. binlog-format=ROW #選擇row模式  
  4. server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複  

(2)在mysql中 配置canal資料庫管理使用者,配置相應許可權(repication許可權)

  1. CREATEUSER canal IDENTIFIED BY'canal';    
  2. GRANTSELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO'canal'@'%';  
  3. -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
  4. FLUSH PRIVILEGES;  
(3)下載canal https://github.com/alibaba/canal/releases 

 解壓到相應資料夾

  1. tar -zxvf canal canal  
canal 檔案目錄結構
  1. drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin  
  2. drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf  
  3. drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib  
  4. drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs  

修改配置 instance.properties
  1. vim canal/conf/example/instance.properties  

  1. #################################################  
  2. ## mysql serverId  
  3. canal.instance.mysql.slaveId = 1234
  4. # position info,需要改成自己的資料庫資訊  
  5. canal.instance.master.address = 127.0.0.1:3306   
  6. canal.instance.master.journal.name =   
  7. canal.instance.master.position =   
  8. canal.instance.master.timestamp =   
  9. #canal.instance.standby.address =   
  10. #canal.instance.standby.journal.name =  
  11. #canal.instance.standby.position =   
  12. #canal.instance.standby.timestamp =   
  13. # username/password,需要改成自己的資料庫資訊  
  14. canal.instance.dbUsername = canal
  15. canal.instance.dbPassword = canal
  16. canal.instance.defaultDatabaseName = canal_test
  17. canal.instance.connectionCharset = UTF-8  
  18. # table regex  
  19. canal.instance.filter.regex = .*\\..*  
  20. #################################################  

然後cd到bin目錄  啟動和停止canal-server

啟動  

  1. ./startup.sh  
停止
  1. ./stop.sh  

驗證啟動狀態,檢視log檔案
  1. vim canal/log/canal/canal.log  

  1. 2014-07-18 10:21:08.525 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.  
  2. 2014-07-18 10:21:08.609 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.12.109.201:11111]  
  3. 2014-07-18 10:21:09.037 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......  

上述日誌資訊顯示啟動canal成功

執行canal-client例項:

(1)建立例項maven工程

  1. mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample  

(2)新增pom依賴:
  1. <dependency>
  2.     <groupId>com.alibaba.otter</groupId>
  3.     <artifactId>canal.client</artifactId>
  4.     <version>1.0.12</version>
  5. </dependency>
(3)更新依賴 mvn install

(4)canal-client.Java  例項程式碼

  1. /** 
  2.  * Created by hp on 14-7-17. 
  3.  */
  4. import java.net.InetSocketAddress;  
  5. import java.util.List;  
  6. import com.alibaba.otter.canal.client.CanalConnector;  
  7. import com.alibaba.otter.canal.common.utils.AddressUtils;  
  8. import com.alibaba.otter.canal.protocol.Message;  
  9. import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
  10. import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
  11. import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
  12. import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
  13. import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
  14. import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
  15. import com.alibaba.otter.canal.client.*;  
  16. import org.jetbrains.annotations.NotNull;  
  17. publicclass ClientSample {  
  18.     publicstaticvoid main(String args[]) {  
  19.         // 建立連結
  20.         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),  
  21.                 11111), "example""""");  
  22.         int batchSize = 1000;  
  23.         int emptyCount = 0;  
  24.         try