1. 程式人生 > >利用Canal完成Mysql資料同步Redis

利用Canal完成Mysql資料同步Redis

流程
Canal的原理是模擬Slave向Master傳送請求,Canal解析binlog,但不將解析結果持久化,而是儲存在記憶體中,每次有客戶端讀取一次訊息,就刪除該訊息。這裡所說的客戶端,就需要我們寫一個連線Canal的程式,持續從Canal獲取資料。

步驟
一、配置Canal
參考https://github.com/alibaba/canal

【mysql配置】
1,配置引數

[mysqld]
log-bin=mysql-bin #新增這一行就ok
binlog-format=ROW #選擇row模式
server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重複
2,在mysql中 配置canal資料庫管理使用者,配置相應許可權(repication許可權)
    CREATE USER canal IDENTIFIED BY 'canal';    
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';  
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;  
    FLUSH PRIVILEGES;  

【canal下載和配置】
1,下載canal https://github.com/alibaba/canal/releases 
2,解壓

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal
3,修改配置檔案
vi conf/example/instance.properties
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234

# position info,需要改成自己的資料庫資訊
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password,需要改成自己的資料庫資訊
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

# table regex
canal.instance.filter.regex = .*\\..*

#################################################

【canal啟動和關閉】
1,啟動
sh bin/startup.sh
2,檢視日誌
vi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
<pre name="user-content-code">2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
具體instance的日誌:
vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
3,關閉
sh bin/stop.sh

注意:
1,這裡只需要配置好引數後,就可以直接執行
2,Canal沒有解析後的檔案,不會持久化

二、建立客戶端
參考https://github.com/alibaba/canal/wiki/ClientExample


其中一個是連線canal並操作的類,一個是redis的工具類,使用maven主要是依賴包的下載很方便。


pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.sample</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <dependency>  
        <groupId>com.alibaba.otter</groupId>  
        <artifactId>canal.client</artifactId>  
        <version>1.0.12</version>  
    </dependency>  
    
    <dependency>  
        <groupId>org.springframework</groupId>  
        <artifactId>spring-test</artifactId>  
        <version>3.1.2.RELEASE</version>  
        <scope>test</scope>  
    </dependency>  
      
    <dependency>  
        <groupId>redis.clients</groupId>  
        <artifactId>jedis</artifactId>  
        <version>2.4.2</version>  
    </dependency>  
    
    </dependencies>
  <build/>
</project>



2,ClientSample程式碼
這裡主要做兩個工作,一個是迴圈從Canal上取資料,一個是將資料更新至Redis
package canal.sample;

import java.net.InetSocketAddress;  
import java.util.List;  

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;  
import com.alibaba.otter.canal.common.utils.AddressUtils;  
import com.alibaba.otter.canal.protocol.Message;  
import com.alibaba.otter.canal.protocol.CanalEntry.Column;  
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;  
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;  
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;  
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;  
import com.alibaba.otter.canal.client.*;  
 
public class ClientSample {  

   public static void main(String args[]) {  
	   
       // 建立連結  
       CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),  
               11111), "example", "", "");  
       int batchSize = 1000;  
       try {  
           connector.connect();  
           connector.subscribe(".*\\..*");  
           connector.rollback();    
           while (true) {  
               Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的資料  
               long batchId = message.getId();  
               int size = message.getEntries().size();  
               if (batchId == -1 || size == 0) {  
                   try {  
                       Thread.sleep(1000);  
                   } catch (InterruptedException e) {  
                       e.printStackTrace();  
                   }  
               } else {  
                   printEntry(message.getEntries());  
               }  
 
               connector.ack(batchId); // 提交確認  
               // connector.rollback(batchId); // 處理失敗, 回滾資料  
           }  
 
       } finally {  
           connector.disconnect();  
       }  
   }  
 
   private static void printEntry( List<Entry> entrys) {  
       for (Entry entry : entrys) {  
           if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {  
               continue;  
           }  
 
           RowChange rowChage = null;  
           try {  
               rowChage = RowChange.parseFrom(entry.getStoreValue());  
           } catch (Exception e) {  
               throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),  
                       e);  
           }  
 
           EventType eventType = rowChage.getEventType();  
           System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",  
                   entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),  
                   entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),  
                   eventType));  
 
           for (RowData rowData : rowChage.getRowDatasList()) {  
               if (eventType == EventType.DELETE) {  
            	   redisDelete(rowData.getBeforeColumnsList());  
               } else if (eventType == EventType.INSERT) {  
            	   redisInsert(rowData.getAfterColumnsList());  
               } else {  
                   System.out.println("-------> before");  
                   printColumn(rowData.getBeforeColumnsList());  
                   System.out.println("-------> after");  
                   redisUpdate(rowData.getAfterColumnsList());  
               }  
           }  
       }  
   }  
 
   private static void printColumn( List<Column> columns) {  
       for (Column column : columns) {  
           System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());  
       }  
   }  
   
	  private static void redisInsert( List<Column> columns){
		  JSONObject json=new JSONObject();
		  for (Column column : columns) {  
			  json.put(column.getName(), column.getValue());  
	       }  
		  if(columns.size()>0){
			  RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
		  }
	   }
	  
	  private static  void redisUpdate( List<Column> columns){
		  JSONObject json=new JSONObject();
		  for (Column column : columns) {  
			  json.put(column.getName(), column.getValue());  
	       }  
		  if(columns.size()>0){
			  RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
		  }
	  }
  
	   private static  void redisDelete( List<Column> columns){
		   JSONObject json=new JSONObject();
			  for (Column column : columns) {  
				  json.put(column.getName(), column.getValue());  
		       }  
			  if(columns.size()>0){
				  RedisUtil.delKey("user:"+ columns.get(0).getValue());
			  }
	   }

   
}  

3,RedisUtil程式碼
package canal.sample;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisUtil {

	// Redis伺服器IP
	private static String ADDR = "10.1.2.190";

	// Redis的埠號
	private static int PORT = 6379;

	// 訪問密碼
	private static String AUTH = "admin";

	// 可用連線例項的最大數目,預設值為8;
	// 如果賦值為-1,則表示不限制;如果pool已經分配了maxActive個jedis例項,則此時pool的狀態為exhausted(耗盡)。
	private static int MAX_ACTIVE = 1024;

	// 控制一個pool最多有多少個狀態為idle(空閒的)的jedis例項,預設值也是8。
	private static int MAX_IDLE = 200;

	// 等待可用連線的最大時間,單位毫秒,預設值為-1,表示永不超時。如果超過等待時間,則直接丟擲JedisConnectionException;
	private static int MAX_WAIT = 10000;

	// 過期時間
	protected static int  expireTime = 60 * 60 *24;
	
	// 連線池
	protected static JedisPool pool;

	/**
	 * 靜態程式碼,只在初次呼叫一次
	 */
	static {
		JedisPoolConfig config = new JedisPoolConfig();
		//最大連線數
		config.setMaxTotal(MAX_ACTIVE);
		//最多空閒例項
		config.setMaxIdle(MAX_IDLE);
		//超時時間
		config.setMaxWaitMillis(MAX_WAIT);
		//
		config.setTestOnBorrow(false);
		pool = new JedisPool(config, ADDR, PORT, 1000);
	}

	/**
	 * 獲取jedis例項
	 */
	protected static synchronized Jedis getJedis() {
		Jedis jedis = null;
		try {
			jedis = pool.getResource();
		} catch (Exception e) {
			e.printStackTrace();
			if (jedis != null) {
				pool.returnBrokenResource(jedis);
			}
		}
		return jedis;
	}

	/**
	 * 釋放jedis資源
	 * 
	 * @param jedis
	 * @param isBroken
	 */
	protected static void closeResource(Jedis jedis, boolean isBroken) {
		try {
			if (isBroken) {
				pool.returnBrokenResource(jedis);
			} else {
				pool.returnResource(jedis);
			}
		} catch (Exception e) {

		}
	}

	/**
	 *  是否存在key
	 * 
	 * @param key
	 */
	public static boolean existKey(String key) {
		Jedis jedis = null;
		boolean isBroken = false;
		try {
			jedis = getJedis();
			jedis.select(0);
			return jedis.exists(key);
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
		return false;
	}

	/**
	 *  刪除key
	 * 
	 * @param key
	 */
	public static void delKey(String key) {
		Jedis jedis = null;
		boolean isBroken = false;
		try {
			jedis = getJedis();
			jedis.select(0);
			jedis.del(key);
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
	}

	/**
	 *  取得key的值
	 * 
	 * @param key
	 */
	public static String stringGet(String key) {
		Jedis jedis = null;
		boolean isBroken = false;
		String lastVal = null;
		try {
			jedis = getJedis();
			jedis.select(0);
			lastVal = jedis.get(key);
			jedis.expire(key, expireTime);
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
		return lastVal;
	}

	/**
	 *  新增string資料
	 * 
	 * @param key
	 * @param value
	 */
	public static String stringSet(String key, String value) {
		Jedis jedis = null;
		boolean isBroken = false;
		String lastVal = null;
		try {
			jedis = getJedis();
			jedis.select(0);
			lastVal = jedis.set(key, value);
			jedis.expire(key, expireTime);
		} catch (Exception e) {
			e.printStackTrace();
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
		return lastVal;
	}

	/**
	 *  新增hash資料
	 * 
	 * @param key
	 * @param field
	 * @param value
	 */
	public static void hashSet(String key, String field, String value) {
		boolean isBroken = false;
		Jedis jedis = null;
		try {
			jedis = getJedis();
			if (jedis != null) {
				jedis.select(0);
				jedis.hset(key, field, value);
				jedis.expire(key, expireTime);
			}
		} catch (Exception e) {
			isBroken = true;
		} finally {
			closeResource(jedis, isBroken);
		}
	}

}
注意:

1,客戶端的Jedis連線不同於專案裡的Jedis連線需要Spring註解,直接使用靜態方法就可以。

執行
1,執行canal服務端startup.bat / startup.sh
2,執行客戶端程式

注意
1,雖然canal服務端解析binlog後不會把資料持久化,但canal服務端會記錄每次客戶端消費的位置(客戶端每次ack時服務端會記錄pos點)。如果資料正在更新時,canal服務端掛掉,客戶端也會跟著掛掉,mysql依然在插入資料,而redis則因為客戶端的關閉而停止更新,造成mysql和redis的資料不一致。解決辦法是,只要重啟canal服務端和客戶端就可以了,雖然canal服務端因為重啟之前解析資料清空,但因為canal服務端記錄的是客戶端最後一次獲取的pos點,canal服務端再從這個pos點開始解析,客戶端更新至redis,以達到資料的一致。
2,如果只有一個canal服務端一個客戶端,肯定存在可用性低的問題,一種做法是用程式來監控canal服務端和客戶端,如果掛掉,再重啟;一種做法是多個canal服務端+zk,將canal服務端的配置檔案放在zk,任何一個canal服務端掛掉後,切換到其他canal服務端,讀到的配置檔案的內容就是一致的(還有記錄的消費pos點),保證業務的高可用,客戶端可使用相同的做法。


-------------------------------------
順便推薦QQ群,僅供學習和交流,歡迎大家的加入
Java     : 139978466
MySQL: 167209377
大資料  : 181560406
前端     : 139978026
測試     : 299057857
運維     : 139978281