1. 程式人生 > >JSON資料從MongoDB遷移到MaxCompute最佳實踐

JSON資料從MongoDB遷移到MaxCompute最佳實踐

資料及賬號準備

首先您需要將資料上傳至您的MongoDB資料庫。本例中使用阿里雲的 雲資料庫 MongoDB 版,網路型別為VPC(需申請公網地址,否則無法與DataWorks預設資源組互通),測試資料如下。

{
    "store": {
        "book": [
             {
                "category": "reference",
                "author": "Nigel Rees",
                "title": "Sayings of the Century"
, "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author"
: "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }, "expensive": 10 }
登入MongoDB的DMS控制檯,本例中使用的資料庫為  admin
,集合為  userlog,您可以在查詢視窗使用 db.userlog.find().limit(10)命令檢視已上傳好的資料,如下圖所示。 
154320427332875_zh-CN.png 
此外,需提前在資料庫內新建使用者,用於DataWorks新增資料來源。本例中使用命令 db.createUser({user:"bookuser",pwd:"123456",roles:["root"]}),新建使用者名稱為  bookuser,密碼為  123456,許可權為 root

使用DataWorks提取資料到MaxCompute

  1. 新增MongoDB資料來源 進入DataWorks 資料整合控制檯,新增 MongoDB型別資料來源。 
    154320427332876_zh-CN.png
    具體引數如下所示,測試資料來源連通性通過即可點選完成。由於本文中MongoDB處於VPC環境下,因此  資料來源型別需選擇  有公網IP。 
    154320427332877_zh-CN.png
    訪問地址及埠號可通過在 MongoDB管理控制檯點選例項名稱獲取,如下圖所示。 
    154320427332878_zh-CN.png
  2. 新建資料同步任務 在DataWorks上新建 資料同步型別節點。 
    154320427332879_zh-CN.png
    新建的同時,在DataWorks新建一個 建表任務,用於存放JSON資料,本例中新建表名為mqdata。 
    154320427331544_zh-CN.png
    表引數可以通過圖形化介面完成。本例中mqdata表僅有一列,型別為string,列名為MQ data。 
    154320427331545_zh-CN.png
    完成上述新建後,您可以在圖形化介面進行資料同步任務引數的初步配置,如下圖所示。選擇目標資料來源名稱為odps_first,選擇目標表為剛建立的mqdata。資料來源型別為MongoDB,選擇我們剛建立的資料來源mongodb_userlog。完成上述配置後,  點選轉換為指令碼,跳轉到指令碼模式。 
    154320427332880_zh-CN.png
    指令碼模式程式碼示例如下。
    
    {
        "type": "job",
        "steps": [
            {
                "stepType": "mongodb",
                "parameter": {
                    "datasource": "mongodb_userlog",
     //資料來源名稱
                    "column": [
                        {
                            "name": "store.bicycle.color", //JSON欄位路徑,本例中提取color值
                            "type": "document.document.string" //本欄目的欄位數需和name一致。假如您選取的JSON欄位為一級欄位,如本例中的expensive,則直接填寫string即可。
                        }
                    ],
                    "collectionName //集合名稱": "userlog"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "isCompress": false,
                    "truncate": true,
                    "datasource": "odps_first",
                    "column": [
         //MaxCompute表列名                 "mqdata"
                    ],
                    "emptyAsNull": false,
                    "table": "mqdata"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "concurrent": 2,
                "throttle": false,
                "dmu": 1
            }
        }
    }
    完成上述配置後,點選執行接即可。執行成功日誌示例如下所示。 
    154320427331550_zh-CN.png

結果驗證

在您的 業務流程中新建一個ODPS SQL節點。 
154320427331551_zh-CN.png 
您可以輸入  SELECT * from mqdata;語句,檢視當前mqdata表中資料。當然這一步您也可以直接在 MaxCompute客戶端中輸入命令執行。 
154320427432881_zh-CN.png