canal+zookeeper高可用配置
阿新 • • 發佈:2019-01-07
一、部署環境
1.基礎環境:
軟體 | 版本 | 作用 |
---|---|---|
Linux | Centos7.1,8g | |
Jdk | 1.8.0_151 | |
canal | 1.1.1 | canal server端,與mysql和zookeeper互動 |
Zookeeper | 3.4.5 | 作為canal server端和client的一個代理者,canal中依賴的zookeeper的版本為3.4.5 |
2.機器環境:canal client伺服器2臺,canal server伺服器2臺(資源有限,zookeeper和canal server 部署在一起)
分類 | IP | 安裝軟體 |
---|---|---|
canal-server | X.X.X.50:2100、X.X.X.54:2100 |
canal-server |
zookeeper | X.X.X.50:2181、X.X.X.54:2181 |
zookeeper(正常的zookeeper服務要求安裝奇數個,因為zookeper的leader選舉,要求可用節點數量 > 總節點數量/2。由於資源有限,本文僅啟動了2個的zookeeper服務,這樣其實對高可用產生了影響,因為如果zookeeper中的任意一臺服務掛掉,也會造成整個canal服務的不可用 ) |
canal-client | X.X.X.X:8999、X.X.X.X:8999 |
業務模組作為canal的client端) |
二、安裝canal-server端
- 軟體下載
連結:canal.deployer-1.1.1.tar.gz - 解壓檔案到指定目錄
/usr/local/etc
- 修改canal.properties配置檔案(系統配置檔案)
canal.id= 2101 #每個canal server例項的唯一標識,暫無實際意義,預設:1
canal.ip= #canal server繫結的本地IP 資訊,如果不配置,預設選擇一個本機IP進行啟動服務,預設:無
canal.port= 2100 #canal server提供socket服務的埠,預設:11111
canal.zkServers= X.X.X.X:2181,X.X.X.X:2181 #canal server連結zookeeper叢集的連結資訊
# flush data to zk
canal.zookeeper.flush.period = 1000 #canal持久化資料到zookeeper上的更新頻率,單位毫秒
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp #canal server端的模式,可選
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = true #是否忽略DCL的query語句,比如grant/create user等,預設false
canal.instance.filter.query.dml = true #是否忽略DML的query語句,比如insert/update/delete table.(mysql5.6的ROW模式可以包含statement模式的query記錄),預設false
canal.instance.filter.query.ddl = true #是否忽略DDL的query語句,比如create table/alater table/drop table/rename table/create index/drop index. (目前支援的ddl型別主要為table級別的操作,create databases/trigger/procedure暫時劃分為dcl型別),預設false
- 修改instance.properties配置檔案(instance級別的配置檔案)
canal.instance.mysql.slaveId=0 #mysql叢集配置中的serverId概念,需要保證和當前mysql叢集中id唯一,預設:1234
# position info
canal.instance.master.address=X.X.X.X:3306 #mysql主庫連結地址,預設:127.0.0.1:3306
canal.instance.master.journal.name= #mysql主庫連結時起始的binlog檔案,預設:無
canal.instance.master.position= #mysql主庫連結時起始的binlog偏移量,預設:無
canal.instance.master.timestamp= #mysql主庫連結時起始的binlog的時間戳,預設:無
# username/password
canal.instance.dbUsername=canal #mysql資料庫帳號
canal.instance.dbPassword=canal #mysql資料庫密碼
canal.instance.connectionCharset = UTF-8 #資料解析編碼
canal.instance.defaultDatabaseName =kuaiche #mysql連結時預設schema
# table regex
canal.instance.filter.regex=kuaiche.bill_transport
# mysql 資料解析關注的表,Perl正則表示式.多個正則之間以逗號(,)分隔,轉義符需要雙斜槓(\\)
# 常見例子:
# 1. 所有表:.* or .*\\..*
# 2. canal schema下所有表: canal\\..*
# 3. canal下的以canal打頭的表:canal\\.canal.*
# 4. canal schema下的一張表:canal.test1
# 5. 多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)
# table black regex
canal.instance.filter.black.regex= # 過濾黑名單:
- 啟動命令
sh bin/startup.sh
- 停止命令
sh bin/stop.sh
- 檢視啟動日誌
canal/logs/canal/canal.log
2018-12-05 17:58:26.174 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2018-12-05 17:58:26.216 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2018-12-05 17:58:26.226 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2018-12-05 17:58:26.636 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[X.X.X.X:X]
2018-12-05 17:58:26.832 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
canal/logs/example/example.log
三、canal-client端
1.使用框架:spring-boot
2.依賴jar包
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.1</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.1</version>
<optional>true</optional>
</dependency>
`注意jar包依賴的衝突,canal依賴的zookeeper版本為3.4.5`
4.canal資料結構
canal的資料傳輸有兩塊,一塊是進行binlog訂閱時,binlog轉換為我們所定義的Message,第二塊是client與server進行TCP互動時,傳輸的TCP協議。
Entry資料結構
Entry
Header
version [協議的版本號,default = 1]
logfileName [binlog檔名]
logfileOffset [binlog position]
serverId [服務端serverId]
serverenCode [變更資料的編碼]
executeTime [變更資料的執行時間]
sourceType [變更資料的來源,default = MYSQL]
schemaName [變更資料的schemaname]
tableName [變更資料的tablename]
eventLength [每個event的長度]
eventType [insert/update/delete型別,default = UPDATE]
props [預留擴充套件]
gtid [當前事務的gitd]
entryType [事務頭BEGIN/事務尾END/資料ROWDATA/HEARTBEAT/GTIDLOG]
storeValue [byte資料,可展開,對應的型別為RowChange]
RowChange
tableId [tableId,由資料庫產生]
eventType [資料變更型別,default = UPDATE]
isDdl [標識是否是ddl語句,比如create table/drop table]
sql [ddl/query的sql語句]
rowDatas [具體insert/update/delete的變更資料,可為多條,1個binlog event事件可對應多條變更,比如批處理]
beforeColumns [欄位資訊,增量資料(修改前,刪除前),Column型別的陣列]
afterColumns [欄位資訊,增量資料(修改後,新增後),Column型別的陣列]
props [預留擴充套件]
props [預留擴充套件]
ddlSchemaName [ddl/query的schemaName,會存在跨庫ddl,需要保留執行ddl的當前schemaName]
Column
index [欄位下標]
sqlType [jdbc type]
name [欄位名稱(忽略大小寫),在mysql中是沒有的]
isKey [是否為主鍵]
updated [是否發生過變更]
isNull [值是否為null]
props [預留擴充套件]
value [欄位值,timestamp,Datetime是一個時間格式的文字]
length [對應資料物件原始長度]
mysqlType [欄位mysql型別]
4.canal功能含義
- 連線,connector.connect()
- 訂閱,connector.subscribe
- 獲取資料,connector.getWithoutAck()
- 業務處理
- 提交確認,connector.ack()
- 回滾,connector.rollback()
- 斷開連線,connector.disconnect()
4.application.yml配置檔案資訊
canal:
server:
url: #單機配置,local/dev/test環境使用
port: #單機配置,local/dev/test環境使用
destination: example #環境共用引數
username:
password:
subscribe: #需要監控庫的mysql表
bc_address_library,bc_contact,bc_contact_company,bc_customer,
bill_entrust,bill_transport,
consigner_info,fleet_info,
fleet_vehicle_info,vehicle_info_temp,fleet_vehicle_mapping,
bc_goods,queue_appoint_query,queue_appoint_record_log,
queue_appoint_trucker,queue_appoint_vehicle,queue_warehouse_notice,
queue_warehouse_notice_trucker_mapping,queue_warehouse_notice_vehicle_mapping
refreshSeconds: 10 #單位:秒
zkServers: X.X.X.X:2181,X.X.X.X:2181 # zookeeper HA高可用配置,forecast/prod環境使用
dbname: kuaiche #資料庫名稱
5.構建連線例項
/**
* 異常等待時間
*/
private static final long EXCEPTION_SECONDS = 10;
/**
* 執行緒最長休眠時間
*/
private static final long MAX_SLEEP_SECONDS = 30;
@Value("${canal.server.zkServers}")
private String zkServers;
@Value("${canal.server.subscribe}")
private String subscribe;
@Value("${canal.server.destination}")
private String destination;
@Value("${canal.server.refreshSeconds}")
private long refreshSeconds;
@Value("${canal.server.dbname}")
private String dbname;
private final JsonUtility jsonUtility;
@Autowired
public CanalClusterClient(JsonUtility jsonUtility) {
this.jsonUtility = jsonUtility;
}
private CanalConnector connector;
try {
while (true) {
//初始化連線,或連線失效時,連線canal server,每次獲取資料時都檢查,確保連線有效性
if(null==connector || !connector.checkValid()){
try {
connector = CanalConnectors.newClusterConnector(zkServers, destination, "", "");
connector.connect();
connector.subscribe(subscribe);
log.debug(">>>> Connection canal server successful,zkServers:【{}】,subscribe:【{}】 <<<<<",zkServers,subscribe);
} catch (Exception e) {
log.debug(">>>> Connection canal server failed,zkServers:【{}】,subscribe:【{}】, exception:【{}】...try again after 10s <<<<<",zkServers,subscribe,e.getMessage());
Thread.sleep(EXCEPTION_SECONDS);
continue;
}
}
// 獲取指定數量的資料
Message message = connector.getWithoutAck(batchSize);
if(null == message){
log.debug(">>>> Canal client connect zookeeper server is running, get Message is null! <<<<<");
Thread.sleep(EXCEPTION_SECONDS);
continue;
}
//重新整理間隔時間不超過30s
if(refreshSeconds>MAX_SLEEP_SECONDS){refreshSeconds=EXCEPTION_SECONDS;}
//獲取同步id
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(refreshSeconds*1000);
refreshSeconds++;
if(refreshSeconds<MAX_SLEEP_SECONDS){
log.debug(">>>> Canal client connect zookeeper server is running, get Message size is empty, ...try again after {}s! <<<<<",refreshSeconds);
}else if(refreshSeconds == MAX_SLEEP_SECONDS){
log.debug(">>>> Canal client connect zookeeper server【{}】,subscribe:【{}】is running, get Message size is empty, ...try again after {}s! <<<<<",zkServers,subscribe,refreshSeconds);
}
} else {
try {
// 非同步請求開始時間
long asyncBeginTime = System.currentTimeMillis();
jsonUtility.printEntry(message.getEntries(),batchId);
//結束時間
long asyncEndTime = System.currentTimeMillis();
log.debug(">>>>Canal clent batchId:{},Async method time-consuming:{}ms<<<<<" , batchId,asyncEndTime-asyncBeginTime);
} catch (Exception e) {
log.error(">>>> PrintEntry Exception:{} <<<<<" , e);
}
}
// 提交確認
connector.ack(batchId);
}
} catch (Exception e) {
log.error(">>>> get message from canal zooleeper server:【{}】,subscribe:【{}】error:【{}】 <<<<<",zkServers,subscribe,e.getMessage());
}
四、安裝zookeeper
1.安裝詳見另一篇:zookeeper
注意:canal中依賴的zookeeper的版本為3.4.5,下載時注意選擇對應的版本號
五、HA模式測試
1.前提條件:
`以下服務都已正常啟動`
- canal-server,X.X.X.50:2100、X.X.X.54:2100
- canal-client,X.X.X.107:8999、X.X.X.85:8999
- zookeeper,X.X.X.50:2181、X.X.X.54:2181
2.狀態檢測
- 連線任一zookeeper客戶端
./zkCli.sh -server X.X.X.50:2181
- 獲取正在同步資料的canal server
get /otter/canal/destinations/example/running
{"active":true,"address":"X.X.X.50:2100","cid":2101}
cZxid = 0x100028621
ctime = Wed Dec 05 17:56:01 CST 2018
mZxid = 0x100028621
mtime = Wed Dec 05 17:56:01 CST 2018
pZxid = 0x100028621
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x16772fcc6300017
dataLength = 57
numChildren = 0
- 獲取正在連線的canal client
get /otter/canal/destinations/example/1001/running
{"active":true,"address":"X.X.X.X:15586","clientId":1001}
cZxid = 0x10002a72c
ctime = Wed Dec 26 17:13:25 CST 2018
mZxid = 0x10002a72e
mtime = Wed Dec 26 17:13:25 CST 2018
pZxid = 0x10002a72c
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x16772fcc6300024
dataLength = 63
numChildren = 0
numChildren = 0
3.HA切換測試
- a)在zookeeper客戶端使用
get /otter/canal/destinations/example/running
命令,獲取到當前正在同步資料的canal server服務的ip地址,正常關閉canal server(會釋放instance的所有資源,包括刪除running節點)
; - b)根據上一步獲取到的服務地址,登入對應的伺服器使用
sh bin/stop.sh
命令停止該server服務; - c)重複步驟a,會看到另一臺canal server成為正在執行的server端;
- d)在zookeeper客戶端使用
get /otter/canal/destinations/example/1001/running
命令,獲取到當前正在同步資料的canal client服務的ip地址; - f)根據上一步獲取到的服務地址,登入對應的伺服器 kill該canal client的程序;
- g)重複步驟d,會看到另一臺canal client成為正在連線的client端;
在切換期間,可以實時修改mysql資料庫的資料,檢視對應的canal客戶端日誌輸出資訊。