1. 程式人生 > >如何在MaxCompute上處理存儲在OSS上的開源格式數據

如何在MaxCompute上處理存儲在OSS上的開源格式數據

雲平臺

前言

MaxCompute作為使用最廣泛的大數據平臺,內部存儲的數據以EB量級計算。巨大的數據存儲量以及大規模計算下高性能數據讀寫的需求,對於MaxCompute提出了各種高要求及挑戰。處在大數據時代,數據的來源多種多樣,開源社區經過十幾年的發展,百花齊放,各種各樣的數據格式不斷的出現。 我們的用戶也在各個場景上,通過各種計算框架,積累了各種不同格式的數據。怎樣將MaxCompute強大的計算能力開放給這些使用開源格式存儲沈澱下來的數據,在MaxCompute上挖掘這些數據中的信息,是MaxCompute團隊希望解決的問題。

MaxCompute 2.0最近推出的非結構化計算框架【公測階段】,旨在從存儲介質和存儲格式兩個維度,打通計算與存儲的通道。 在之前的文章中,我們已經介紹過怎樣在MaxCompute上對存儲在OSS上的文本,音頻,圖像等格式的數據,以及TableStore(OTS)的KV數據進行計算處理。在這裏,則將介紹對於各種流行的開源數據格式(ORC, PARQUET, SEQUENCEFILE, RCFILE, AVRO, TEXTFILE等等),怎樣將其存儲在OSS上面,並通過非結構化框架在MaxCompute進行處理。

本著不重造輪子的原則,對於絕大部分這些開源數據格式的解析工作,在非結構化框架中會直接調用開源社區的實現,並且無縫的與MaxCompute系統做對接。

1. 創建EXTERNAL TABLE來綁定OSS外部數據

MaxCompute非結構化數據框架通過EXTERNAL TABLE的概念來提供MaxCompute與各種數據的聯通,與讀取OSS數據的使用方法類似,對OSS數據進行寫操作,首先要通過CREATE EXTERNAL TABLE語句創建出一個外部表,而在讀取開源數據格式時,創建外表的DDL語句格式如下:

DROP TABLE [IF EXISTS] <external_table>;

CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table>
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
[ROW FORMAT SERDE '<serde class>']
STORED AS <file format>
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'

可以看到,這個語法與HIVE的語法是相當接近的,而在這個CREATE EXTERNAL TABLE的ddl語句中,有如下幾點要說明:

  1. 首先要特別說明的是這裏使用的是STORED AS的關鍵字,而不是普通非結構化外表用的STORED BY關鍵字,這也是目前在讀取開源兼容數據時獨有的。

  2. 外部表的<column schemas> 必須與具體OSS上存儲存儲數據的schema相符合。

  3. ROW FORMAT SERDE 並非必選選項,只有在使用一些特殊的格式上,比如TEXTFILE時才需要使用。

  4. STORED AS後面接的是文件格式名字, 比如 ORC/PARQUET/RCFILE/SEQUENCEFILE/TEXTFILE 等等。

  5. 最後還要提到的是,在上面這個例子中,我們在LOCATION上使用了OSS明文AK,這只適用於在用戶對於AK的保密性不敏感情況下使用。 對於數據安全比較敏感的場景,比如在多用戶場景或者彈外集群上,則推薦使用通過STS/RAM體系事先進行鑒權,從而避免使用明文AK。

1.1 範例1: 關聯OSS上存儲的PARQUET數據

現在再來看一個具體的例子,假設我們有一些PARQUET文件存放在一個OSS路徑上,每個文件都是PARQUET格式,存放著schema為16列(4列BINGINT, 4列DOUBLE, 8列STRING)的數據,那麽可以通過如下DDL語句來描述:

CREATE EXTERNAL TABLE tpch_lineitem_parquet
(
  l_orderkey bigint,
  l_partkey bigint,
  l_suppkey bigint,
  l_linenumber bigint,
  l_quantity double,
  l_extendedprice double,
  l_discount double,
  l_tax double,
  l_returnflag string,
  l_linestatus string,
  l_shipdate string,
  l_commitdate string,
  l_receiptdate string,
  l_shipinstruct string,
  l_shipmode string,
  l_comment string
)
STORED AS PARQUET
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/parquet_data/';

1.2 範例2:分區表關聯OSS上存儲的TEXTFILE數據

同樣的數據,如果是每行以JSON格式,存儲成OSS上TEXTFILE文件;同時,數據在OSS通過多個目錄組織,這時是可以使用MaxCompute分區表和數據關聯,則可以通過如下DDL語句來描述:

CREATE EXTERNAL TABLE tpch_lineitem_textfile
(
  l_orderkey bigint,
  l_partkey bigint,
  l_suppkey bigint,
  l_linenumber bigint,
  l_quantity double,
  l_extendedprice double,
  l_discount double,
  l_tax double,
  l_returnflag string,
  l_linestatus string,
  l_shipdate string,
  l_commitdate string,
  l_receiptdate string,
  l_shipinstruct string,
  l_shipmode string,
  l_comment string
)
PARTITIONED BY (ds string)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/';

如果OSS表目錄下面的子目錄是以Partition Name方式組織,比如:

oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170102/'
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170103/'
...

則可以使用以下DDL語句ADD PARTITION:

ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102");
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103");

如果OSS分區目錄不是按這種方式組織,或者根本不在表目錄下,比如:

oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/;
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/;
...

則可以使用以下DDL語句ADD PARTITION:

ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102")
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/';
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103")
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/';
...

2. 讀取以及處理 OSS 上面的開源格式數據

對比上面的兩個範例,可以看出對於不同文件類型,只要簡單修改STORED AS後的格式名。在接下來的例子中,我們將只集中描述對上面PARQUET數據對應的外表(tpch_lineitem_parquet)的處理,如果要處理不同的文件類型,只要在DDL創建外表時指定是PARQUET/ORC/TEXTFILE/RCFILE/TEXTFILE即可,處理數據的語句則是一樣的。

2.1 直接讀取以及處理OSS上面的開源數據

在創建數據外表後,直接對外表就可以進行與普通MaxCompute表的操作,直接對存儲在OSS上的數據進行處理,比如:

SELECT l_returnflag,
    l_linestatus,
    SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,
    AVG(l_quantity) AS avg_qty,
    COUNT(*) AS count_order
FROM tpch_lineitem_parquet
WHERE l_shipdate <= '1998-09-02'
GROUP BY
    l_returnflag,
    l_linestatus;

可以看到,在這裏tpch_lineitem_parquet這個外表被當作一個普通的內部表一樣使用。唯一不同的只是在MaxCompute內部計算引擎將從OSS上去讀取對應的PARQUET數據來進行處理。

但是我們應該強調的是,在這裏直接使用外表,每次讀取的時候都需要涉及外部OSS的IO操作,並且MaxCompute系統本身針對內部存儲做的許多高性能優化都用不上了,所以性能上會有所損失。 所以如果是需要對數據進行反復計算以及對計算的高效性比較敏感的場景上,我們推薦下面這種用法:先將數據導入MaxCompute內部,再進行計算。

註意,上面例子中的tpch_lineitem_textfile表,因為使用了ROW FORMAT + STORED AS,需要手動設置flag(只使用STORED AS,odps.sql.hive.compatible默認為TRUE),再進行讀取,否則會有報錯。

SELECT * FROM tpch_lineitem_textfile LIMIT 1;
FAILED: ODPS-0123131:User defined function exception - Traceback:
com.aliyun.odps.udf.UDFException: java.lang.ClassNotFoundException: com.aliyun.odps.hive.wrapper.HiveStorageHandlerWrapper

--需要手動設置hive兼容flag
set odps.sql.hive.compatible=true;
SELECT * FROM tpch_lineitem_textfile LIMIT 1;
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
| l_orderkey | l_partkey  | l_suppkey  | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax      | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment |
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
| 5640000001 | 174458698  | 9458733    | 1            | 14.0       | 23071.58        | 0.08       | 0.06       | N            | O            | 1998-01-26 | 1997-11-16   | 1998-02-18    | TAKE BACK RETURN | SHIP       | cuses nag silently. quick |
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+

2.2 將OSS上的開源數據導入MaxCompute,再進行計算

  • 首先創建一個與外部表schema一樣的內部表tpch_lineitem_internal,然後將OSS上的開源數據導入MaxCompute內部表,以cFile格式存儲在MaxCompute內部:

CREATE TABLE tpch_lineitem_internal LIKE tpch_lineitem_parquet;

INSERT OVERWRITE TABLE tpch_lineitem_internal
SELECT * FROM tpch_lineitem_parquet;

  • 直接就可以對內部表進行同樣的操作:

SELECT l_returnflag,
    l_linestatus,
    SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,
    AVG(l_quantity) AS avg_qty,
    COUNT(*) AS count_order
FROM tpch_lineitem_internal
WHERE l_shipdate <= '1998-09-02'
GROUP BY
    l_returnflag,
    l_linestatus;

通過這樣子將數據先導入系統的情況下,對同樣數據的計算就會更高效得多。

4. 結語

開源的種種數據格式往往由各種數據處理生態產生,而MaxCompute非結構化數據處理框架通過實現計算與存儲的互聯,希望打通阿裏雲核心計算平臺與各種數據的通路。在這個基礎上,各種各樣依賴於不同數據格式的應用,將能在MaxCompute計算平臺上實現,後繼我們會對一些具體的這種應用,比如基因計算等,再做一些具體的case study以及介紹。我們也歡迎有對開源數據進行處理分析的更多應用,能在MaxCompute強大計算能力的基礎上開花結果。

原文鏈接


如何在MaxCompute上處理存儲在OSS上的開源格式數據