如何使 Hive 對 DynamoDB 進行併發讀取?
問題背景
DynamoDB 建立了類似如下的表:
{ "accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", // Primary key. "date": "2018-12-12", "when": "2018-12-12 17:22:15", "context": "something" }
這個表用來記錄訪問記錄,以 accessid (隨機字串) 作為 Primary key. Date 記錄當條記錄的生成日期。
我們希望通過 Hive 每天將前一天的資料從 DynamoDB 備份到 S3 上,具體方法可參考文件[1].
hive> INSERT OVERWRITE TABLE s3_table
Dynamodb表裡存放了很多天的資料,執行以上操作會消耗很長時間,如何能加快速度?比如讓 Hive 進行併發讀取?
測試使用的 AWS EMR 版本為 emr-5.20.0 (Amazon 2.8.5, Ganglia 3.7.2, Hive 2.3.4, Hue 4.3.0, Mahout 0.13.0, Pig 0.17.0, Tez 0.9.1). emr-dynamodb connector 的版本是 4.7.
$ sudo rpm -qa | grep emr-ddb emr-ddb-hadoop-4.7.0-1.amzn1.noarch emr-ddb-hive-4.7.0-1.amzn1.noarch emr-ddb-4.7.0-1.amzn1.noarch
DynamoDB 的 Query 和 Scan 操作
DynamoDB 是個 NoSQL 資料庫。在 Hive 操作中,如果 WHERE 後面的條件是 Primary Key,比如:
hive > SELECT * FROM ddb_table WHERE accessid="c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9";
這時,Hive 會進行 Query 操作,能直接拿到結果。可以理解為 O(1) 的操作。
而如果 WHERE 後面的條件只是個普通的 Attribute, 那 Hive 需要呼叫 Scan 操作,掃描整個表的條目,過濾出符合條件的條件。可以理解為 O(n) 的操作。比如:
hive> INSERT OVERWRITE TABLE s3_table
Hive 需要讀取整個表,對比每個條目的 date 是否為"2018-12-12". 因此,這個操作需要讀取大量資料,消耗大量 DynamoDB 的 RCU.
Scan 操作是可以分 segment 的[2]。在這種情況下,如果 Hive 能在不同節點同時讀取不同的 segment,這樣可以加快讀取速度。
Query 操作能直接讀取 Key 對應的 Value, 沒有並行的需要。
Hive 對 DynamoDB 的併發讀取
按照上面說法,如果能讓 Hive 啟動多個 container 掃描 dynamodb table 的不同 segment, 可以起到併發加速的效果。
Hive 查詢 DynamoDB 的動作實際上由 emr-dynamodb-connector 來實現。它會根據 DynamoDB 當前的 RCU 設定,來決定分多少個 segment 同時讀取。一般來說,每 200 RCU 會啟動一個 container。所以,如果我們把 DynamoDB 的 RCU 設定為 6000 (在 DynamoDB 側設定), Hive 會同時啟動 30 個 container 來讀取資料。
hive> INSERT OVERWRITE TABLE s3_table > SELECT * FROM ddb_table; ---------------------------------------------------------------------------------------------- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ---------------------------------------------------------------------------------------------- Map 1 .......... container SUCCEEDED 30 30 0 0 0 0 ----------------------------------------------------------------------------------------------
具體邏輯可參考 emr-dynamodb-connector 相關程式碼[3].
使用合適的 DynamoDB Key 來用 Query 代替 Scan
儘管併發可以提高速度,但是當 DynamoDB table 增長後,scan 的耗時將會越來越長,而且加大 RCU 會產生不少的費用。
如果可以優化 DynamoDB Table 的設計,將 Scan 的操作轉換為 Query,速度能極大地提升,且能節省費用。
考慮到 Hive 需要用 date 進行查詢,建立 Global Secondary Index 或許是個方法[4]。簡單地說,就是建立除了 Primary Key 之外的索引,使得查詢這個 GSI 的時候,也能做到 O(1) 的效果。比如,在上述例子中,可以將 date 設定為 GSI. 這樣,使用 date 作為查詢條件,也可以呼叫 query 操作,不需要 scan. 可惜,現在的 emr-dynamodb-connector 還不支援 GSI,Hive 還會傻傻地去掃描整個 Table.[5]
為了繞過這個問題,我們可以把 Date 設定成 Primary Key, 把 when 設定成 sort key (因為一個 key 只能有一個 value, 所以要用一個組合 key 來避免覆蓋),示例資料:
{ "accessid": "c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9", //<-- Global Secondary Index Partition Key. "date": "2018-12-19", //<-- Primary Key "context": "8c5d5b6c-1f50-4fc6-9462-c3dc01efe54a", "when": "2018-12-19 0 0426c743-0d1a-44ef-8bd8-f9765387c981", //<-- Sort Key, 後面加入隨機字串以避免key重複 }
如此一來,使用 Hive 以 date 作為條件進行查詢的時候,就會呼叫 query,加快速度並節省費用。
hive> INSERT OVERWRITE TABLE ddb_table_1 > SELECT * FROM ddb_tkio_ds WHERE ds="2018-12-18"; ---------------------------------------------------------------------------------------------- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ---------------------------------------------------------------------------------------------- Map 1 .......... container SUCCEEDED 1 1 0 0 0 0 ---------------------------------------------------------------------------------------------- VERTICES: 01/01 [==========================>>] 100% ELAPSED TIME: 10.50 s ----------------------------------------------------------------------------------------------
幾萬條資料,只用了10秒就搞定。
修改 Table 後,應用程式需要相應修改來通過 GSI 提取資料。使用 GSI 需要指定 index-name。例如:
response = table.query( IndexName='accessid-index', ##<--- KeyConditionExpression=Key(''accessid").eq("c63b88a3-1503-4c2c-a7c2-3a1ffde7bff9") )
具體可參考[6].
後續
前面提到,如果要使 Hive 併發進行 Scan,需要手動調大 RCU。長期調大 RCU 可能會產生大量費用。
目前,emr-dynamodb-connector 正在新增對 DynamoDB on-demand capacity 的支援[7]。如果這個功能最後能實現,使用者可以把 DynamoDB table 設定成 On-Demand 模式,按實際的讀寫數量來收費。Hive 在進行 scan 的時候,也會自動地併發。
參考文件
[1] https://docs.amazonaws.cn/en_us/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.html
[2] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.ParallelScan
[3] https://github.com/awslabs/emr-dynamodb-connector/blob/47a1a9f752b71767f981b284b522f6edb50c1370/emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/read/AbstractDynamoDBInputFormat.java#L98
[4] https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GSI.html
[5] https://github.com/awslabs/emr-dynamodb-connector/issues/87
[6] https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.query
[7] https://github.com/awslabs/emr-dynamodb-connector/issues/86