1. 程式人生 > >Hive 整合 Hudi 實踐(含程式碼)| 可能是全網最詳細的資料湖系列

Hive 整合 Hudi 實踐(含程式碼)| 可能是全網最詳細的資料湖系列

公眾號後臺越來越多人問關於資料湖相關的內容,看來大家對新技術還是很感興趣的。關於資料湖的資料網路上還是比較少的,特別是實踐系列,對於新技術來說,基礎的入門文件還是很有必要的,所以這一篇希望能夠幫助到想使用Hudi的同學入門。 **本篇的Hudi使用的是孵化版本 0.5.2;其他依賴 Spark-2.4.4,Hive-1.1.0** # Hudi 伺服器環境準備 ``` wget https://github.com/apache/hudi/archive/release-0.5.2-incubating.tar.gz tar zxvf release-0.5.2-incubating.tar.gz cd release-0.5.2-incubating mvn clean package -DskipTests -DskipITs cp ./hudi-hadoop-mr/target/hudi-hadoop-mr-0.5.2-incubating.jar $HIVE_HOME/lib/ ``` 拷貝依賴包到 Hive 路徑是為了 Hive 能夠正常讀到 Hudi 的資料,至此伺服器環境準備完畢。 # 用 Spark 寫一段資料 一切準備完畢先寫一段資料到 Hudi 裡,首先資料來源 ods.ods_user_event 的表結構為: ``` CREATE TABLE ods.ods_user_event( uuid STRING, name STRING, addr STRING, update_time STRING, date STRING) stored as parquet; ``` 然後是 Maven 的依賴,詳細程式碼關注公眾號【老懞大資料】回覆 hudi 後即可獲取。 ``` org.apache.hudi hudi-spark_2.11 0.5.2-incubating org.apache.hudi hudi-common 0.5.2-incubating ``` **程式碼邏輯:** 1. 初始化 SparkSession,配置相關配置項 2. 構建 DataFrame,大家可以自由發揮,這裡的案例是從Hive讀資料構建。 3. DataFrame寫入Hudi,這一塊說到底就是把資料寫入 HDFS 路徑下,但是需要一堆配置,這些配置就體現了 Hudi 的特性: - **DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY**:指定唯一id的列名 - **DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY**:指定更新時間,該欄位數值大的資料會覆蓋小的 - **DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KE**Y:指定分割槽列,和Hive的分割槽概念類似 - **HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH**:設定當分割槽變更時,當前資料的分割槽目錄是否變更 - **HoodieIndexConfig.INDEX_TYPE_PROP**:設定索引型別目前有 **HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM** 四種索引 上述例子中,選擇了 HoodieGlobalBloomIndex(全域性索引),會在所有分割槽內查詢指定的 recordKey。而 HoodieBloomIndex 只在指定的分割槽內查詢。 ``` def main(args: Array[String]): Unit = { val sss = SparkSession.builder.appName("hudi") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("hive.metastore.uris", "thrift://ip:port") .enableHiveSupport().getOrCreate() val sql = "select * from ods.ods_user_event" val df: DataFrame = sss.sql(sql) df.write.format("org.apache.hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "recordKey") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "update_time") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "date") .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) .option("hoodie.insert.shuffle.parallelism", "10") .option("hoodie.upsert.shuffle.parallelism", "10") .option(HoodieWriteConfig.TABLE_NAME, "ods.ods_user_event_hudi") .mode(SaveMode.Append) .save("/user/hudi/lake/ods.db/ods_user_event_hudi") } ``` 執行成功後會有如下結果,因為我們是按照date分割槽,每一天的資料會生成一個資料夾和Hive類似。 ``` [hadoop@hadoop31 ~]# hdfs dfs -ls /user/hudi/lake/ods.db/ods_user_event_hudi/ Found 4 items drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200501 drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200502 drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200503 drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200504 ``` 另外,注意 recordKey 必須唯一,不然資料會被覆蓋,且值不能為 null,否則會有以下報錯。 ``` Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "user_uid" cannot be null or empty. ``` # Hive 建立外部表讀資料 上一步中 Spark 將資料寫到了 hudi,想要通過Hive訪問到這塊資料,就需要建立一個Hive外部表了,因為 Hudi 配置了分割槽,所以為了能讀到所有的資料,咱們的外部表也得分割槽,分割槽欄位名可隨意配置。 ``` CREATE TABLE ods.ods_user_event_hudi( uuid STRING, name STRING, addr STRING, update_time STRING, date STRING) PARTITIONED BY ( `dt` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION '/user/hudi/lake/ods.db/ods_user_event_hudi' ``` 至此,直接讀資料肯定是空的,因為我們建立的是個分割槽表,所以還需要指定分割槽 ``` alter table ods.ods_user_event_hudi add if not exists partition(dt='20200504') location '/user/hudi/lake/ods.db/ods_user_event_hudi/20200504' ``` 那麼這個時候問題來了,一年有365個分割槽,要一個一個建立手動建立分割槽嗎? 抱歉我也沒發現更好的辦法,只能送你個簡單的指令碼了。 ``` #!/bin/bash start_date=20190101 end_date=20200520 start=`date -d "$start_date" "+%s"` end=`date -d "$end_date" "+%s"` for((i=start;i<=end;i+=86400)); do dt=$(date -d "@$i" "+%Y%m%d") hive -e "alter table ods.ods_user_event_hudi add if not exists partition(dt='${dt}') location '/user/hudi/lake/ods.db/ods_user_event_hudi/${dt}'; " done ``` # 後記 最後,執行 **select * from ods.ods_user_event_hudi** 要是沒有資料你來找我。另外值得注意的是,如果此時直接用 Hive 將資料 insert into ods.ods_user_event_hudi,雖然資料會寫入到 hudi 的目錄下,但是相同的 recordKey 是不會覆蓋原有資料的。 下一篇詳細寫 Spark 操作 Hudi 的相關內容,敬請期待。本篇詳細程式碼關注公眾號【老懞大資料】回覆 hudi 後即可獲取。 **推薦閱讀** [3000字長文教你大資料該怎麼學!](https://www.cnblogs.com/uncledata/p/12886978.html) [選方向?大資料的職位你瞭解多少](https://www.cnblogs.com/uncledata/p/1291265