1. 程式人生 > >centos7 下安裝canal,並實現將mysql資料同步到redis

centos7 下安裝canal,並實現將mysql資料同步到redis

簡介:canal為阿里巴巴產品,它主要模擬了mysql的Slave向Master傳送請求,當mysql有增刪改查時則會出發請求將資料傳送到canal服務中,canal將資料存放到記憶體,直到客戶端程式(canal服務端和客戶端程式都是由java編寫,且客戶端邏輯由我們藉助com.alibaba.otter.canal工具包下的類完成開發)通過釋出-訂閱這種模式消費canal服務中的資料。

安裝步驟如下:

1:進入網站:https://github.com/alibaba/canal/releases進行下載(需要翻牆才能下載),我選的版本為1.0.24,

2:解壓 tar zxvf canal.deployer-1.0.24.tar.gz -C /tmp/canal

3:進入root使用者進入mysql建立關於canal的管理使用者,如果想用之前建立的mysql管理使用者的話,此步驟可跳過,在${canal解壓目錄}/conf/example/instance.properties制指定好mysql使用者名稱和密碼即可,如下為建立新的mysql管理使用者:

CREATE USER canal IDENTIFIED BY 'canal';      
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';    
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;    
FLUSH PRIVILEGES;

4:修改canal的基本配置檔案 ${canal解壓目錄}/conf/example/instance.properties:

vi conf/example/instance.properties,修改為如下:

## mysql serverId  
canal.instance.mysql.slaveId = 1234  
  
# position info,
canal.instance.master.address = 127.0.0.1:3306   //需要改成自己的資料庫資訊  
canal.instance.master.journal.name =   
canal.instance.master.position =   
canal.instance.master.timestamp =   
  
#canal.instance.standby.address =   
#canal.instance.standby.journal.name =  
#canal.instance.standby.position =   
#canal.instance.standby.timestamp =   
  
# username/password,需要改成自己的資料庫資訊  
canal.instance.dbUsername = canal    //為第三步設定的使用者名稱
canal.instance.dbPassword = canal  //為第三步設定的密碼


canal.instance.defaultDatabaseName =  
canal.instance.connectionCharset = UTF-8  
  
# table regex  
canal.instance.filter.regex = .*\\..*

5:開放canal服務埠(預設為11111),並檢測上面的操作是否能讓canal服務正常啟動,進入bin目錄,執行啟動命令startup.sh命令,然後到 logs目錄下的兩個目錄canal和example目錄下檢視日誌檔案,如果都沒有異常則表示服務啟動成功,接著執行第六步

6:配置mysql關聯canal的更新模式:

找到mysql的安裝目錄下的my.cnf檔案,新增如下資訊:

log-bin=mysql-bin #新增這一行就ok  
binlog-format=ROW #選擇row模式  
server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複

7:編寫客戶端程式進行測試(本測試類只是獲取了canal中的資料,存放到redis中的邏輯可以自行實現):

import java.net.InetSocketAddress;
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.concurrent.TimeUnit;  
  
import com.alibaba.otter.canal.client.CanalConnector;  
import com.alibaba.otter.canal.client.CanalConnectors;  
import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
import com.alibaba.otter.canal.protocol.Message;  


public class Test {
public static void main(String[] args) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.0.112",    
              11111), "example", "", ""); 
System.out.println("連線");
connector.connect();  
    connector.subscribe(".*\\..*");  
 
System.out.println("連線成功");
int batchSize = 100;  
 
       while (true) {  
           long batchId = -1;  
           try {  
               Message message = connector.getWithoutAck(batchSize, new Long(5), TimeUnit.SECONDS); // 獲取指定數量的資料  
               batchId = message.getId();  
               int size = message.getEntries().size();  
               System.out.println("batchSize:" + size);  
               printEntry(message.getEntries());  
 
               connector.ack(batchId); // 提交確認  
           } catch (Exception e) {  
               connector.rollback(batchId); // 處理失敗, 回滾資料  
               connector.disconnect();  
           }  
       }  
 
   }  
 
   private static void printEntry(List<Entry> entrys) {  
       for (Entry entry : entrys) {  
           if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN  
                   || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
               continue;  
           }  
 
           RowChange rowChage = null;  
           try {  
               rowChage = RowChange.parseFrom(entry.getStoreValue());  
           } catch (Exception e) {  
               throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
                       e);  
           }  
           EventType eventType = rowChage.getEventType();  
           System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                   entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                   entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));  
 
           for (RowData rowData : rowChage.getRowDatasList()) {  
               Map<String, Object> map = null;  
               if (eventType == EventType.DELETE) {  
                   map = printColumn(rowData);  
               } else if (eventType == EventType.INSERT) {  
                   map = printColumn(rowData);  
               } else if (eventType == EventType.UPDATE) {  
                   map = printColumn(rowData);  
               }  
               System.out.print(eventType + ":");  
               System.out.println(map);  
           }  
       }  
   }  
 
   private static Map<String, Object> printColumn(RowData rowData) {  
       Map<String, Object> map = new HashMap<String, Object>();  
       for (Column column : rowData.getBeforeColumnsList()) {  
           map.put(column.getName(), column.getValue());  
       }  
       return map;  
   }  
}

7:總結

利用canal可以很好實現資料同步的邏輯和業務邏輯分離,相互之間互不影響,方便維護和擴充套件

其次另外的注意點:因為canal伺服器是由java所寫,所以伺服器上未安裝jdk的話,可能會報錯(純屬推測),由於我本機上安裝有jdk所以沒出現次問題,若啟動服務有問題的朋友可考慮下此注意點,還有若用客戶端訪問canal服務的話,也不要忘記開放埠