大資料時代,資料實時同步解決方案的思考—最全的資料同步總結
1、 早期關係型資料庫之間的資料同步
1)、全量同步
比如從oracle資料庫中同步一張表的資料到Mysql中,通常的做法就是 分頁查詢源端的表,然後通過 jdbc的batch 方式插入到目標表,這個地方需要注意的是,分頁查詢時,一定要按照主鍵id來排序分頁,避免重複插入。
2)、基於資料檔案匯出和匯入的全量同步,這種同步方式一般只適用於同種資料庫之間的同步,如果是不同的資料庫,這種方式可能會存在問題。
3)、基於觸發器的增量同步
增量同步一般是做實時的同步,早期很多資料同步都是基於關係型資料庫的觸發器trigger來做的。
使用觸發器實時同步資料的步驟:
A、 基於原表創觸發器,觸發器包含insert,modify,delete 三種類型的操作,資料庫的觸發器分Before和After兩種情況,一種是在insert,modify,delete 三種類型的操作發生之前觸發(比如記錄日誌操作,一般是Before),一種是在insert,modify,delete 三種類型的操作之後觸發。
B、 建立增量表,增量表中的欄位和原表中的欄位完全一樣,但是需要多一個操作型別欄位(分表代表insert,modify,delete 三種類型的操作),並且需要一個唯一自增ID,代表資料原表中資料操作的順序,這個自增id非常重要,不然資料同步就會錯亂。
C、 原表中出現insert,modify,delete 三種類型的操作時,通過觸發器自動產生增量資料,插入增量表中。
D、處理增量表中的資料,處理時,一定是按照自增id的順序來處理,這種效率會非常低,沒辦法做批量操作,不然資料會錯亂。 有人可能會說,是不是可以把insert操作合併在一起,modify合併在一起,delete操作合併在一起,然後批量處理,我給的答案是不行,因為資料的增刪改是有順序的,合併後,就沒有順序了,同一條資料的增刪改順序一旦錯了,那資料同步就肯定錯了。
市面上很多資料etl資料交換產品都是基於這種思想來做的。
E、 這種思想使用kettle 很容易就可以實現,筆者曾經在自己的部落格中寫過 kettle的文章,https://www.cnblogs.com/laoqing/p/7360673.html
4)、基於時間戳的增量同步
A、首先我們需要一張臨時temp表,用來存取每次讀取的待同步的資料,也就是把每次從原表中根據時間戳讀取到資料先插入到臨時表中,每次在插入前,先清空臨時表的資料
B、我們還需要建立一個時間戳配置表,用於存放每次讀取的處理完的資料的最後的時間戳。
C、每次從原表中讀取資料時,先查詢時間戳配置表,然後就知道了查詢原表時的開始時間戳。
D、根據時間戳讀取到原表的資料,插入到臨時表中,然後再將臨時表中的資料插入到目標表中。
E、從快取表中讀取出資料的最大時間戳,並且更新到時間戳配置表中。快取表的作用就是使用sql獲取每次讀取到的資料的最大的時間戳,當然這些都是完全基於sql語句在kettle中來配置,才需要這樣的一張臨時表。
2、 大資料時代下的資料同步
1)、基於資料庫日誌(比如mysql的binlog)的同步
我們都知道很多資料庫都支援了主從自動同步,尤其是mysql,可以支援多主多從的模式。那麼我們是不是可以利用這種思想呢,答案當然是肯定的,mysql的主從同步的過程是這樣的。
A、master將改變記錄到二進位制日誌(binary log)中(這些記錄叫做二進位制日誌事件,binary log events,可以通過show binlog events進行檢視);
B、slave將master的binary log events拷貝到它的中繼日誌(relay log);
C、slave重做中繼日誌中的事件,將改變反映它自己的資料。
阿里巴巴開源的canal就完美的使用這種方式,canal 偽裝了一個Slave 去喝Master進行同步。
A、 canal模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master傳送dump協議
B、 mysql master收到dump請求,開始推送binary log給slave(也就是canal)
C、 canal解析binary log物件(原始為byte流)
另外canal 在設計時,特別設計了 client-server 模式,互動協議使用 protobuf 3.0 , client 端可採用不同語言實現不同的消費邏輯。
canal java 客戶端: https://github.com/alibaba/canal/wiki/ClientExample
canal c# 客戶端: https://github.com/dotnetcore/CanalSharp
canal go客戶端: https://github.com/CanalClient/canal-go
canal php客戶端: https://github.com/xingwenge/canal-php、
github的地址:https://github.com/alibaba/canal/
D、在使用canal時,mysql需要開啟binlog,並且binlog-format必須為row,可以在mysql的my.cnf檔案中增加如下配置
log-bin=E:/mysql5.5/bin_log/mysql-bin.log
binlog-format=ROW
server-id=123、
E、 部署canal的服務端,配置canal.properties檔案,然後 啟動 bin/startup.sh 或bin/startup.bat
#設定要監聽的mysql伺服器的地址和埠
canal.instance.master.address = 127.0.0.1:3306
#設定一個可訪問mysql的使用者名稱和密碼並具有相應的許可權,本示例使用者名稱、密碼都為canal
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
#連線的資料庫
canal.instance.defaultDatabaseName =test
#訂閱例項中所有的資料庫和表
canal.instance.filter.regex = .*\\..*
#連線canal的埠
canal.port= 11111
#監聽到的資料變更傳送的佇列
canal.destinations= example
F、 客戶端開發,在maven中引入canal的依賴
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.21</version>
</dependency>
程式碼示例:
package com.example; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; public class CanalClientExample { public static void main(String[] args) { while (true) { //連線canal CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal"); connector.connect(); //訂閱 監控的 資料庫.表 connector.subscribe("demo_db.user_tab"); //一次取10條 Message msg = connector.getWithoutAck(10); long batchId = msg.getId(); int size = msg.getEntries().size(); if (batchId < 0 || size == 0) { System.out.println("沒有訊息,休眠5秒"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else { // CanalEntry.RowChange row = null; for (CanalEntry.Entry entry : msg.getEntries()) { try { row = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = row.getRowDatasList(); for (CanalEntry.RowData rowdata : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList(); Map<String, Object> dataMap = transforListToMap(afterColumnsList); if (row.getEventType() == CanalEntry.EventType.INSERT) { //具體業務操作 System.out.println(dataMap); } else if (row.getEventType() == CanalEntry.EventType.UPDATE) { //具體業務操作 System.out.println(dataMap); } else if (row.getEventType() == CanalEntry.EventType.DELETE) { List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { if ("id".equals(column.getName())) { //具體業務操作 System.out.println("刪除的id:" + column.getValue()); } } } else { System.out.println("其他操作型別不做處理"); } } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } //確認訊息 connector.ack(batchId); } } } public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) { Map map = new HashMap(); if (afterColumnsList != null && afterColumnsList.size() > 0) { for (CanalEntry.Column column : afterColumnsList) { map.put(column.getName(), column.getValue()); } } return map; } }
2)、基於BulkLoad的資料同步,比如從hive同步資料到hbase
我們有兩種方式可以實現,
A、 使用spark任務,通過HQl讀取資料,然後再通過hbase的Api插入到hbase中。
但是這種做法,效率很低,而且大批量的資料同時插入Hbase,對Hbase的效能影響很大。
在大資料量的情況下,使用BulkLoad可以快速匯入,BulkLoad主要是借用了hbase的儲存設計思想,因為hbase本質是儲存在hdfs上的一個資料夾,然後底層是以一個個的Hfile存在的。HFile的形式存在。Hfile的路徑格式一般是這樣的:
/hbase/data/default(預設是這個,如果hbase的表沒有指定名稱空間的話,如果指定了,這個就是名稱空間的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>
B、 BulkLoad實現的原理就是按照HFile格式儲存資料到HDFS上,生成Hfile可以使用hadoop的MapReduce來實現。如果不是hive中的資料,比如外部的資料,那麼我們可以將外部的資料生成檔案,然後上傳到hdfs中,組裝RowKey,然後將封裝後的資料在回寫到HDFS上,以HFile的形式儲存到HDFS指定的目錄中。
當然我們也可以不事先生成hfile,可以使用spark任務直接從hive中讀取資料轉換成RDD,然後使用HbaseContext的自動生成Hfile檔案,部分關鍵程式碼如下:
… //將DataFrame轉換bulkload需要的RDD格式 val rddnew = datahiveDF.rdd.map(row => { val rowKey = row.getAs[String](rowKeyField) fields.map(field => { val fieldValue = row.getAs[String](field) (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue)))) }) }).flatMap(array => { (array) }) … //使用HBaseContext的bulkload生成HFile檔案 hbaseContext.bulkLoad[Put](rddnew.map(record => { val put = new Put(record._1) record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) put }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload") val conn = ConnectionFactory.createConnection(hBaseConf) val hbTableName = TableName.valueOf(hBaseTempTable.getBytes()) val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn)) val realTable = conn.getTable(hbTableName) HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator) // bulk load start val loader = new LoadIncrementalHFiles(hBaseConf) val admin = conn.getAdmin() loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator) sc.stop() } … def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = { val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList() import scala.collection.JavaConversions._ for (cells <- put.getFamilyCellMap.entrySet().iterator()) { val family = cells.getKey for (value <- cells.getValue) { val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value)) ret.+=((kfq, CellUtil.cloneValue(value))) } } ret.iterator } } …
C、pg_bulkload的使用
這是一個支援pg庫(PostgreSQL)批量匯入的外掛工具,它的思想也是通過外部檔案載入的方式,這個工具筆者沒有親自去用過,詳細的介紹可以參考:https://my.oschina.net/u/3317105/blog/852785 pg_bulkload專案的地址:http://pgfoundry.org/projects/pgbulkload/
3)、基於sqoop的全量匯入
Sqoop 是hadoop生態中的一個工具,專門用於外部資料匯入進入到hdfs中,外部資料匯出時,支援很多常見的關係型資料庫,也是在大資料中常用的一個數據匯出匯入的交換工具。
Sqoop從外部匯入資料的流程圖如下:
Sqoop將hdfs中的資料匯出的流程如下:
本質都是用了大資料的資料分散式處理來快速的匯入和匯出資料。
&n