1. 程式人生 > >kylin cube 構建過程

kylin cube 構建過程

如果 inter enc org 命令使用 efault 合並 註意 row

本文是對 http://kylin.apache.org/docs20/howto/howto_optimize_build.html 的翻譯,以便閱讀。

1. 創建 Hive 中間表(Create Intermediate Flat Hive Table)

這個過程會把 cube 中用到的所有 Hive 表(包括 look at 的表)匯聚成一張表。如果 Cube 是分區的,kylin 會增加時間條件以獲取相應的數據。此過程的輸出大致如下:

hive -e "USE default;
DROP TABLE IF EXISTS kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34;

CREATE EXTERNAL TABLE IF NOT EXISTS kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34
(AIRLINE_FLIGHTDATE date,AIRLINE_YEAR int,AIRLINE_QUARTER int,...,AIRLINE_ARRDELAYMINUTES int)
STORED AS SEQUENCEFILE
LOCATION ‘hdfs:///kylin/kylin200instance/kylin-0a8d71e8-df77-495f-b501-03c06f785b6c/kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34‘;

SET dfs.replication=2;
SET hive.exec.compress.output=true;
SET hive.auto.convert.join.noconditionaltask=true;
SET hive.auto.convert.join.noconditionaltask.size=100000000;
SET mapreduce.job.split.metainfo.maxsize=-1;

INSERT OVERWRITE TABLE kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34 SELECT
AIRLINE.FLIGHTDATE
,AIRLINE.YEAR
,AIRLINE.QUARTER
,...
,AIRLINE.ARRDELAYMINUTES
FROM AIRLINE.AIRLINE as AIRLINE
WHERE (AIRLINE.FLIGHTDATE >= ‘1987-10-01‘ AND AIRLINE.FLIGHTDATE < ‘2017-01-01‘);
"

Hive 命令使用的配置文件在 conf/kylin_hive_conf.xml 中。

如果 Cube 的分區列(partition cloumn)和 Hive 的分區列(partition cloumn) 相同,那麽在這一列上進行過濾,會使 Hive 能智能的過濾掉不符合的分區。因此強烈建議使用 Hive 表的分區列(如果是日期格式的話) 作為 Cube 的分區列。對於很大的表來說,這幾乎是必須的。

如果你的 Hive 開啟的文件合並,你應該在 conf/kylin_hive_conf.xml 中禁用它。因為 kylin 會在後面的步驟中會做這件事。

<property
> <name>hive.merge.mapfiles</name> <value>false</value> <description>Disable Hive‘s auto merge</description> </property>

2. 重新分區中間表(Redistribute intermediate table)

在第1步中生成的數據文件有大有小,這會導致後續 MapReduce 的 job 有的很快就完成了,有的要處理很久。kylin 在這一步就是重新分區這些數據。日誌大致如下:

total input rows = 159869711
expected input rows per mapper = 1000000
num reducers for RedistributeFlatHiveTableStep = 160

重新分區的命令:

hive -e "USE default;
SET dfs.replication=2;
SET hive.exec.compress.output=true;
SET hive.auto.convert.join.noconditionaltask=true;
SET hive.auto.convert.join.noconditionaltask.size=100000000;
SET mapreduce.job.split.metainfo.maxsize=-1;
set mapreduce.job.reduces=160;
set hive.merge.mapredfiles=false;

INSERT OVERWRITE TABLE kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34 SELECT * FROM kylin_intermediate_airline_cube_v3610f668a3cdb437e8373c034430f6c34 DISTRIBUTE BY RAND();
"

首先,得到中間表中總共有多少條記錄,kylin 默認情況下,一個文件分配一百萬條數據(通常一百萬條數據不會超過 HDFS 中一個塊的大小)。此例中 159869711 條數據會被分到 160 個文件中,會有 160 個 Mapper 和 160 個 Reduce 任務。如果你想增加並發量,並且你的 Hadoop 集群有充足的資源,你可以在 “conf/kylin.properties”中給“kylin.job.mapreduce.mapper.input.rows” 設置一個較小的值。

然後, Kylin 運行 “INSERT OVERWIRTE TABLE …. DISTRIBUTE BY “ 這個 HiveQL, 把數據分發到各個 reducer 。通常會通過 “DISTRIBUTE BY RAND()” 將數據隨機的分發到各個 reducer 中。

如果你的 Cube 指定了 “shard by” 維度(在 Cube 的 “Advanced setting” 頁),這個維度又是一個高基維(如:user_id),kylin 會告訴 Hive,讓其使用這一列的值進行分發數據,代替上面的 RAND() 方式。這會有利於 Cube 後續的構建。在一些典型的場景中,這將減少 40% 的構建時間。此例中 distribute 的條件是 “DISTRIBUTE BY USER_ID”:

註意:“shard by” 列必須是高基維的列,並且它會出現在大多數的 Cubeid 中,利用它可以獲得每個時間範圍內數據的均勻性,否則會引起數據傾斜。

3. 獲取事實表去重後的值(Extract Fact Table Distinct Columns)

在這一步, kylin 會獲取每個維度的去重後的值,用於數據字典編碼。它也會使用 HyperLogLog 計數器去收集 Cube 的統計信息用以估算每個 cubeid 的行數。

如果你發現 Mapper 比較慢,這通常意味著 Cube 太復雜。

如果 Reducer 發生 OutOfMemory 的錯誤,這意味著 Cubeid 組合過大,或者默認的 YARN 無法分配所需的資源。

如果這一步不能在合理的時間內完成,你應該得新審視你的設計。

4. 建立維度字典(Build Dimension Dictionary)

通過上一步得到值,kylin 在內存中(以後會移到MR)創建這些值的字典。這步通常很快,但是如果這些值有很多,kylin 會報告類似 “Too high cardinality is not suitable for dictionary” 的錯誤。對這些高基維(UHC)請使用 “fixed_length”, “integer” 做為編碼方式。

5. Save Cuboid Statistics and Create HTable

這步很輕量,並且很快。

6. Build Base Cuboid

這一步從中間表創建基礎 cuboid。這是 “by layer” cubing 算法中的第一輪 MR。Mapper 的數量是第2步中的 Reducer 的值。Reducer 的數量通過 Cube 統計信息進行估算(默認每 500MB 的輸出使用一個 reducer)。如果觀察到 reducer 數量較少,可以在kylin.properties 中設置 “kylin.job.mapreduce.default.reduce.input.mb” 成一個較小的值。

7. Build N-Dimension Cuboid

這一步會逐層構建 cube,每一步均使用上一步的結果作為此步的輸入,然後減少一維,聚合成一個子 cuboid。例如:從 cuboid ABCD 中去除A,得到 BCD;去除 B 得到 ACD。

一些 cuboid 可以從多個父 cuboid 中得到,kylin 會使用較小的 cuboid。記住,在設計 cube 的 rowkey 時,應該把高基維放到前面。

通常從 N-D 到 (N/2)-D 的構建是慢的,因為這是由 cuboid 的爆發引起的: N-D 有 1 個 Cuboid, (N-1)-D 有 N個 cuboids, (N-2)-D 有 N*(N-1) 個 cuboids... 經過 (N/2)-D 步後, 構建將會逐漸加快。

8. Build Cube

這一步使用 “by-split” Cubing (也叫 “in-mem” cubing) 算法去構建 Cube,它使用一次 MR 去計算所有的 cuboid,需要更多的內存。每個 Mapper 默認需要3GB,如果你的集群有足夠的資源,可以通過 “conf/kylin_job_conf_inmem.xml” 分配更多的內存,有助於提升性能。

<property>
    <name>mapreduce.map.memory.mb</name>
    <value>6144</value>
</property>

<property>
    <name>mapreduce.map.java.opts</name>
    <value>-Xmx5632m</value>
</property>

註意: kylin 會自動根據數據分布(從 Cube 的統計信息中獲得)選擇最好的算法。你不必明確指定算法。

9. Convert Cuboid Data to HFile

這一步會啟動一個 MR 的 job,把 Cuboid 文件(sequence file format)轉化成 HBase 的 HFile。Kylin 會計算 HBase 的 Region number,默認每個 Region 5GB 大小。Reducer 的數量隨著 Region 的增加而增加。如果你發現 Reducer 的數量較小、效率較低, 你可以修改 “conf/kylin.properties” 中下面的配置:

kylin.hbase.region.cut=2
kylin.hbase.hfile.size.gb=1

10. Load HFile to HBase Table

這一步利用 HBase 的 API 把 HFile 載入到 region server。這一步會很快。

11. Update Cube Info

數據加 HBase後,kylin 標記 Cube segment 在元數據中已準備完畢。這一步很快。

12. Cleanup

清除 Hive 中的臨時表。這一步如果出錯的話,不會有任何影響。當 kylin 執行 StorageCleanupJob 時,會清除垃圾。

kylin cube 構建過程