1. 程式人生 > >使用DataX進行OTS例項間資料遷移

使用DataX進行OTS例項間資料遷移

 表格儲存是 NoSQL 的資料儲存服務,是基於雲端計算技術構建的一個分散式結構化和半結構化資料的儲存和管理服務。表格儲存的資料模型以二維表為中心。表有行和列的概念,但是與傳統資料庫不一樣,表格儲存的表是稀疏的,每一行可以有不同的列,可以動態增加或者減少屬性列,建表時不需要為表的屬性列定義嚴格的 schema。

1. 概述

 OTS的資料遷移可以使用DataX完成全量資料遷移。但由於部分資料表的資料量較大,無法在指定的視窗內完成全量遷移,且目前DataX只能針對主鍵值進行範圍查詢,暫不支援按照屬性列範圍抽取資料。所以可以按如下兩種方式實現全量+增量的資料遷移:

  1. 分割槽鍵包含範圍資訊(如時間資訊、自增ID),則以指定range為切分點,分批次遷移;
  2. 分割槽鍵不包含範圍資訊,則可以採用在應用側雙寫的模式將資料分批次遷移,寫入目標環境同一張業務表。利用OTS的主鍵唯一性,選擇對重複資料執行覆蓋原有行的策略來保證資料唯一性;
    本文以應用側調整為雙寫模式為例,詳細說明OTS資料遷移、校驗過程。其中OTS資料遷移流程具體如下所示:

108

1) 預遷移階段:雙寫模式中的大表全量遷移。
2) 正式遷移階段:雙寫模式中的增量表全量遷移、其餘小表的全量遷移。

2. 預遷移階段

2.1 準備工作

 為保證新老環境的資料一致性,需要在開始資料遷移前,對目標環境的OTS資料表進行資料清空操作,Delete操作是通過Datax工具直接刪除表內資料,無需重新建表。具體操作如下:

2.1.1 配置datax任務

 在使用datax執行資料清空前,需配置對應資料表使用datax執行delete任務所需的json檔案。在清空資料的配置中,reader與writer均配置目標端的連線資訊,且資料寫入模式配置DeleteRow即可,具體內容如下。

{
    "job": {
        "setting": {
            "speed": {
                "channel": "5"
            }
        },
        "content": [{
            "reader": {
                "name": "otsreader",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yyy.com/",
                    "accessId": "dest_accessId",
                    "accessKey": "dest_accessKey",
                    "instanceName": " dest_instanceName",
                    "table": " tablename ",
                    "column": [{
                            "name": "xxxxx"
                        },
                        {
                            "name": "xxxxx"
                        }
                    ],
                    "range": {
                        "begin": [{
                            "type": "INF_MIN"
                        }],
                        "end": [{
                            "type": "INF_MAX"
                        }]
                    }
                }
            },
            "writer": {
                "name": "otswriter",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yun.yyy.com/",
                    "accessId": "dest_accessId",
                    "accessKey": "dest_accessKey",
                    "instanceName": " dest_instanceName",
                    "table": " tablename ",
                    "primaryKey": [{
                        "name": "xxxxx",
                        "type": "string"
                    }],
                    "column": [{
                            "name": "xxxxx",
                            "type": "string"
                        },
                        {
                            "name": "xxxxx",
                            "type": "string"
                        }
                    ],
                    "writeMode": "DeleteRow"
                }
            }
        }]
    }
}

2.1.2 執行datax任務

  1. 登入datax所在ECS後,進入datax所在路徑
  2. 在對應的工具機分別執行del_pre.sh指令碼,即可開始目標環境對應表的資料清空,具體命令如下:
sh del_pre.sh

del_pre.sh指令碼內容如下:

#!/bin/bash
nohup python datax.py del_table_1.json --jvm="-Xms16G -Xmx16G" > del_table_1.log &

2.2 資料遷移

 在不停服務的情況下把源環境內資料量較大的資料表全部遷移到目標環境內對應的資料表。

2.2.1 配置datax任務

 在datax對資料表配置相應的json檔案,遷移配置的具體內容如下:
需注意,由於OTS本身是NoSQL系統,在遷移資料的配置中,必須配置所有的屬性列,否則會缺失對應屬性列的值。

{
    "job": {
        "setting": {
            "speed": {
                "channel": "5"
            }
        },
        "content": [{
            "reader": {
                "name": "otsreader",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yyy.com/",
                    "accessId": "src_accessId",
                    "accessKey": "src_ accessKey ",
                    "instanceName": " src_instanceName",
                    "table": "tablename",
                    "column": [{
                            "name": "xxxxx"
                        },
                        {
                            "name": "xxxxx"
                        }
                    ],
                    "range": {
                        "begin": [{
                            "type": "INF_MIN"
                        }],
                        "end": [{
                            "type": "INF_MAX"
                        }]
                    }
                }
            },
            "writer": {
                "name": "otswriter",
                "parameter": {
                    "endpoint": "http://xxx.vpc.ots.yun.zzz.com/",
                    "accessId": "dest_accessId",
                    "accessKey": "dest_accessKey",
                    "instanceName": " dest_instanceName",
                    "table": " tablename ",
                    "primaryKey": [{
                        "name": "xxxxx",
                        "type": "string"
                    }],
                    "column": [{
                            "name": "xxxxx",
                            "type": "string"
                        },
                        {
                            "name": "xxxxx",
                            "type": "string"
                        }
                    ],
                    "writeMode": "PutRow"
                }
            }
        }]
    }
}

2.2.2 執行datax任務

  1. 登入datax所在ECS後,進入datax所在路徑
  2. 在對應的工具機分別執行pre_transfer.sh指令碼,即可開始專有域OTS到專有云OTS的資料遷移,具體命令如下:
sh pre_transfer.sh
  • pre_transfer.sh指令碼內容如下:
#!/bin/bash
nohup python datax.py table_1.json --jvm="-Xms16G -Xmx16G" >table_1.log &

3. 正式遷移階段

3.1 OTS資料靜默

 OTS的資料靜默主要是通過觀察對應表的資料是否存在變化來判斷,校驗方式主要包括行數統計、內容統計。

3.1.1 行數統計

 因OTS本身不提供count介面,所以採用在hive建立OTS外部表的方式,讀取OTS資料並計算對應資料表的行數,具體操作如下:

  1. 建立外部表
     啟動hive,建立上述資料表對應的外部表;為提高統計效率,外部表可以只讀取OTS的主鍵列,建表語句示例如下:
CREATE EXTERNAL TABLE t_oilcard_expenses_old
(h_card_no string)
STORED BY 'com.aliyun.openservices.tablestore.hive.TableStoreStorageHandler'
WITH SERDEPROPERTIES(
"tablestore.columns.mapping"="card_no")
TBLPROPERTIES ("tablestore.endpoint"="$endpoint ","tablestore.instance"="instanceName","tablestore.access_key_id"="ak","tablestore.access_key_secret"="sk","tablestore.table.name"="tableName");
  1. 進入指令碼所在路徑
    登入Hadoop叢集master所在ECS,進入hive所在目錄;
  2. 執行行數統計
    執行pre_all_count.sh指令碼,即可開始源環境內OTS對應表的行數統計;
nohup sh pre_all_count.sh >pre_all_count.log &
  • pre_all_count.sh指令碼內容如下:
#!/bin/bash
  ./bin/hive -e "use ots;select count(h_card_no) from tableName;" >table.rs &

 連續執行兩次行數統計,若兩次統計結果一致則說明資料已經靜默,資料寫入以停止;

3.1.2 內容統計

由於部分資料表分割槽鍵對應的值比較單一,導致資料全部儲存在同一個分割槽。若採用hive統計行數會耗時太久,所以對於這個表使用datax將OTS資料匯入oss的方式進行內容統計,具體操作如下:

  1. 進入指令碼所在路徑
    登入上述表格對應的ECS,進入datax所在路徑;
  2. 執行內容校驗
    1) 執行check_table.sh指令碼,即可將源環境內OTS資料表匯出到OSS;
sh check_table.sh
  • check_table.sh指令碼內容如下:
#!/bin/bash
nohup python datax.py table.json --jvm="-Xms8G -Xmx8G">ots2oss01.log &

2) 獲取OSS object的ETAG值,寫入對應檔案table_check01.rs
連續執行兩次內容統計,對比兩次匯出object的ETAG值,若結果一致則說明資料已經靜默,資料寫入以停止;

3.2 OTS資料遷移

3.2.1 準備工作

 為保證遷移後新老環境資料一致,防止目標環境因測試產生遺留髒資料,在進行資料遷移前,需要將目標環境的OTS的其餘全量表進行資料清空。資料清空方式主要有drop、delete,兩者的區別如下:

操作方式 優勢 劣勢 建議
Drop 耗時短 需重新建表
新表分割槽數需重新擴充套件
分割槽數為1的資料表建議選擇drop
Delete 分割槽數保留 耗時較長 需要保留分割槽數提升遷移速率的表建議選擇delete
3.2.1.1 Drop表操作
  1. 登入OTS圖形化客戶端所在工具機,使用如下資訊連線指定OTS例項,並進行對應表的drop操作;
AK: dest_accessId
SK: dest_accessKey
InstanceName: InstanceName
Endpoint:endpoint
  1. 確認刪除後,再在客戶端重新建立對應的資料;

3.2.1.2 Delete表操作

 Delete操作是通過Datax工具直接刪除表內資料,無需重新建表。Datax所需的配置檔案參考2.1.1所示。

  1. 登入datax所在ECS後,進入datax所在路徑
  2. 在對應的工具機分別執行delete指令碼,即可開始目標環境OTS的對應表的資料清空,具體命令如下:
sh del_table_01.sh
  • del_table_01.sh指令碼內容如下:
#!/bin/bash
nohup python datax.py del_table.json --jvm="-Xms16G -Xmx16G">del_table.log &

3.2.2 資料遷移

 在源環境停止服務的情況下把雙寫模式中的增量表全量遷移以及其餘小表全部遷移到目標環境內對應的資料表。
具體操作如下:

3.2.2.1 配置datax任務

 在datax對上述資料表配置相應的json檔案,遷移配置的具體內容參考2.2.1,在遷移資料的配置中,需要列全所有的屬性列。

3.2.2.2 執行datax任務
  1. 登入datax所在ECS後,進入datax所在路徑
  2. 在對應的工具機分別執行transfer.sh指令碼,即可開始專有域OTS到專有云OTS的資料遷移,具體命令如下:
sh transfer.sh
  • transfer.sh指令碼內容如下:
#!/bin/bash
nohup python datax.py Table.json  >Table.log &

3.3 OTS資料校驗

 新老環境OTS的資料校驗方式均包括行數統計、內容統計,具體如下。

3.3.1 源環境資料統計

 源環境OTS資料表的資料量統計依據資料靜默期間最後一次的統計結果即可。

3.3.2 目標環境資料統計

3.3.2.1 行數統計

 因OTS本身不提供count介面,且目標環境ODPS支援建立OTS外部表,所以採用在ODPS建立OTS外部表的方式,讀取OTS資料並計算對應資料表的行數,具體操作如下:

  1. 建立外部表
    登入odpscmd,建立上述資料表對應的外部表;
  2. 進入指令碼所在路徑
    登入odpscmd工具所在ECS,進入odps所在路徑;
  3. 執行行數統計
    執行newots_count.sh指令碼,即可進行目標環境內OTS對應表的行數統計;
nohup sh newots_count.sh >newots_count.log &
  • newots_count.sh指令碼內容如下:
#!/bin/bash
./bin/odpscmd -e "select count(h_card_no) from tableName;" >table.rs &
3.3.2.2 內容統計

由於源環境的部分資料表採用內容統計的方式進行資料校驗,為了方便對比資料是否一致,所以目標環境也採用內容統計的方式,具體操作參考3.1.2。