1. 程式人生 > >大資料時代,資料實時同步解決方案的思考—最全的資料同步總結

大資料時代,資料實時同步解決方案的思考—最全的資料同步總結

 

1、 早期關係型資料庫之間的資料同步

1)、全量同步

比如從oracle資料庫中同步一張表的資料到Mysql中,通常的做法就是 分頁查詢源端的表,然後通過 jdbc的batch 方式插入到目標表,這個地方需要注意的是,分頁查詢時,一定要按照主鍵id來排序分頁,避免重複插入。

2)、基於資料檔案匯出和匯入的全量同步,這種同步方式一般只適用於同種資料庫之間的同步,如果是不同的資料庫,這種方式可能會存在問題。

3)、基於觸發器的增量同步

增量同步一般是做實時的同步,早期很多資料同步都是基於關係型資料庫的觸發器trigger來做的。

使用觸發器實時同步資料的步驟:

A、 基於原表創觸發器,觸發器包含insert,modify,delete 三種類型的操作,資料庫的觸發器分Before和After兩種情況,一種是在insert,modify,delete 三種類型的操作發生之前觸發(比如記錄日誌操作,一般是Before),一種是在insert,modify,delete 三種類型的操作之後觸發。

B、 建立增量表,增量表中的欄位和原表中的欄位完全一樣,但是需要多一個操作型別欄位(分表代表insert,modify,delete 三種類型的操作),並且需要一個唯一自增ID,代表資料原表中資料操作的順序,這個自增id非常重要,不然資料同步就會錯亂。

C、 原表中出現insert,modify,delete 三種類型的操作時,通過觸發器自動產生增量資料,插入增量表中。

D、處理增量表中的資料,處理時,一定是按照自增id的順序來處理,這種效率會非常低,沒辦法做批量操作,不然資料會錯亂。  有人可能會說,是不是可以把insert操作合併在一起,modify合併在一起,delete操作合併在一起,然後批量處理,我給的答案是不行,因為資料的增刪改是有順序的,合併後,就沒有順序了,同一條資料的增刪改順序一旦錯了,那資料同步就肯定錯了。

市面上很多資料etl資料交換產品都是基於這種思想來做的。

E、 這種思想使用kettle 很容易就可以實現,筆者曾經在自己的部落格中寫過 kettle的文章,https://www.cnblogs.com/laoqing/p/7360673.html

4)、基於時間戳的增量同步

A、首先我們需要一張臨時temp表,用來存取每次讀取的待同步的資料,也就是把每次從原表中根據時間戳讀取到資料先插入到臨時表中,每次在插入前,先清空臨時表的資料

B、我們還需要建立一個時間戳配置表,用於存放每次讀取的處理完的資料的最後的時間戳。

C、每次從原表中讀取資料時,先查詢時間戳配置表,然後就知道了查詢原表時的開始時間戳。

D、根據時間戳讀取到原表的資料,插入到臨時表中,然後再將臨時表中的資料插入到目標表中。

E、從快取表中讀取出資料的最大時間戳,並且更新到時間戳配置表中。快取表的作用就是使用sql獲取每次讀取到的資料的最大的時間戳,當然這些都是完全基於sql語句在kettle中來配置,才需要這樣的一張臨時表。

2、    大資料時代下的資料同步

1)、基於資料庫日誌(比如mysql的binlog)的同步

我們都知道很多資料庫都支援了主從自動同步,尤其是mysql,可以支援多主多從的模式。那麼我們是不是可以利用這種思想呢,答案當然是肯定的,mysql的主從同步的過程是這樣的。

  A、master將改變記錄到二進位制日誌(binary log)中(這些記錄叫做二進位制日誌事件,binary log events,可以通過show binlog events進行檢視);

  B、slave將master的binary log events拷貝到它的中繼日誌(relay log);

  C、slave重做中繼日誌中的事件,將改變反映它自己的資料。

阿里巴巴開源的canal就完美的使用這種方式,canal 偽裝了一個Slave 去喝Master進行同步。

A、 canal模擬mysql slave的互動協議,偽裝自己為mysql slave,向mysql master傳送dump協議

 

B、 mysql master收到dump請求,開始推送binary log給slave(也就是canal)

C、 canal解析binary log物件(原始為byte流)

另外canal 在設計時,特別設計了 client-server 模式,互動協議使用 protobuf 3.0 , client 端可採用不同語言實現不同的消費邏輯。

canal java 客戶端: https://github.com/alibaba/canal/wiki/ClientExample

canal c# 客戶端: https://github.com/dotnetcore/CanalSharp

canal go客戶端: https://github.com/CanalClient/canal-go

canal php客戶端: https://github.com/xingwenge/canal-php、

github的地址:https://github.com/alibaba/canal/

D、在使用canal時,mysql需要開啟binlog,並且binlog-format必須為row,可以在mysql的my.cnf檔案中增加如下配置

log-bin=E:/mysql5.5/bin_log/mysql-bin.log

binlog-format=ROW

server-id=123、

E、 部署canal的服務端,配置canal.properties檔案,然後 啟動 bin/startup.sh 或bin/startup.bat

#設定要監聽的mysql伺服器的地址和埠

canal.instance.master.address = 127.0.0.1:3306

#設定一個可訪問mysql的使用者名稱和密碼並具有相應的許可權,本示例使用者名稱、密碼都為canal

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

#連線的資料庫

canal.instance.defaultDatabaseName =test

#訂閱例項中所有的資料庫和表

canal.instance.filter.regex = .*\\..*

#連線canal的埠

canal.port= 11111

#監聽到的資料變更傳送的佇列

canal.destinations= example

F、 客戶端開發,在maven中引入canal的依賴

   <dependency>
         <groupId>com.alibaba.otter</groupId>
          <artifactId>canal.client</artifactId>
          <version>1.0.21</version>
      </dependency>

程式碼示例:

 

package com.example;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

 
public class CanalClientExample {

    public static void main(String[] args) {
        while (true) {
            //連線canal
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal");
            connector.connect();
            //訂閱 監控的 資料庫.表
            connector.subscribe("demo_db.user_tab");
            //一次取10條
            Message msg = connector.getWithoutAck(10);

            long batchId = msg.getId();
            int size = msg.getEntries().size();
            if (batchId < 0 || size == 0) {
                System.out.println("沒有訊息,休眠5秒");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //
                CanalEntry.RowChange row = null;
                for (CanalEntry.Entry entry : msg.getEntries()) {
                    try {
                        row = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List<CanalEntry.RowData> rowDatasList = row.getRowDatasList();
                        for (CanalEntry.RowData rowdata : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList();
                            Map<String, Object> dataMap = transforListToMap(afterColumnsList);
                            if (row.getEventType() == CanalEntry.EventType.INSERT) {
                                //具體業務操作
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.UPDATE) {
                                //具體業務操作
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.DELETE) {
                                List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    if ("id".equals(column.getName())) {
                                        //具體業務操作
                                        System.out.println("刪除的id:" + column.getValue());
                                    }
                                }
                            } else {
                                System.out.println("其他操作型別不做處理");
                            }

                        }

                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                }
                //確認訊息
                connector.ack(batchId);
            }


        }
    }

    public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) {
        Map map = new HashMap();
        if (afterColumnsList != null && afterColumnsList.size() > 0) {
            for (CanalEntry.Column column : afterColumnsList) {
                map.put(column.getName(), column.getValue());
            }
        }
        return map;
    }


}

  

2)、基於BulkLoad的資料同步,比如從hive同步資料到hbase

 

我們有兩種方式可以實現,

A、 使用spark任務,通過HQl讀取資料,然後再通過hbase的Api插入到hbase中。

但是這種做法,效率很低,而且大批量的資料同時插入Hbase,對Hbase的效能影響很大。

在大資料量的情況下,使用BulkLoad可以快速匯入,BulkLoad主要是借用了hbase的儲存設計思想,因為hbase本質是儲存在hdfs上的一個資料夾,然後底層是以一個個的Hfile存在的。HFile的形式存在。Hfile的路徑格式一般是這樣的:

/hbase/data/default(預設是這個,如果hbase的表沒有指定名稱空間的話,如果指定了,這個就是名稱空間的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>

B、 BulkLoad實現的原理就是按照HFile格式儲存資料到HDFS上,生成Hfile可以使用hadoop的MapReduce來實現。如果不是hive中的資料,比如外部的資料,那麼我們可以將外部的資料生成檔案,然後上傳到hdfs中,組裝RowKey,然後將封裝後的資料在回寫到HDFS上,以HFile的形式儲存到HDFS指定的目錄中。

 

 

當然我們也可以不事先生成hfile,可以使用spark任務直接從hive中讀取資料轉換成RDD,然後使用HbaseContext的自動生成Hfile檔案,部分關鍵程式碼如下:

…
//將DataFrame轉換bulkload需要的RDD格式
    val rddnew = datahiveDF.rdd.map(row => {
      val rowKey = row.getAs[String](rowKeyField)
 
      fields.map(field => {
        val fieldValue = row.getAs[String](field)
        (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
      })
    }).flatMap(array => {
      (array)
    })
…
//使用HBaseContext的bulkload生成HFile檔案
    hbaseContext.bulkLoad[Put](rddnew.map(record => {
      val put = new Put(record._1)
      record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
      put
    }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")
 
    val conn = ConnectionFactory.createConnection(hBaseConf)
    val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
    val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
    val realTable = conn.getTable(hbTableName)
    HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
 
    // bulk load start
    val loader = new LoadIncrementalHFiles(hBaseConf)
    val admin = conn.getAdmin()
    loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)
 
    sc.stop()
  }
…
  def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
    val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
    import scala.collection.JavaConversions._
    for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
      val family = cells.getKey
      for (value <- cells.getValue) {
        val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
        ret.+=((kfq, CellUtil.cloneValue(value)))
      }
    }
    ret.iterator
  }
}

…

  

C、pg_bulkload的使用

這是一個支援pg庫(PostgreSQL)批量匯入的外掛工具,它的思想也是通過外部檔案載入的方式,這個工具筆者沒有親自去用過,詳細的介紹可以參考:https://my.oschina.net/u/3317105/blog/852785   pg_bulkload專案的地址:http://pgfoundry.org/projects/pgbulkload/

3)、基於sqoop的全量匯入

Sqoop 是hadoop生態中的一個工具,專門用於外部資料匯入進入到hdfs中,外部資料匯出時,支援很多常見的關係型資料庫,也是在大資料中常用的一個數據匯出匯入的交換工具。

 

 

Sqoop從外部匯入資料的流程圖如下:

Sqoop將hdfs中的資料匯出的流程如下:

本質都是用了大資料的資料分散式處理來快速的匯入和匯出資料。

&n