1. 程式人生 > >hdfs 小檔案合併方案(附程式碼)

hdfs 小檔案合併方案(附程式碼)

背景:
presto計算落地出現了大量的小檔案,目前暫時沒有發現可以通過引數優化解決,所以開發了小檔案合併工具

工具架構如下
image

工具主要分為三部分:

collector

負責將合併規則推送到redis佇列,合併規則物件定義如下,

public class FileCombineRuleDto {

    private int fileSize;//預設單位mb
    private String fileDir;//檔案路徑
    private String type;//檔案型別:parquet,orc
    private int startTime;//開始時間20180101
    private int endTime;//結束時間15
}

目前工具支援orc和parquet兩種格式檔案合併,這也是目前使用最廣泛的資料格式,同時支援定義檔案大小,這裡需要注意的是,如果最後一個檔案大小小於fileSize/2,那麼這部分資料會合併到上一個檔案中,這樣做的目的是為了避免再次生成小檔案;這樣做的效果就是會出現合併的檔案大於fileSize的情況[最後一個檔案]

同時也支援合併指定時間區間的資料,startTime開始時間,資料格式:20180101,endTime:15[假定今天為20181029],那麼資料的合併區間為:20180101 - 20181014

collector對應實現的入口為FileDirCollector.java,這部分使用者可以自定義實現,因為功能比較簡單,就是將FileCombineRuleDto轉換成json字串扔到redis佇列中

訊息佇列

這裡使用的是redis,起初本想將這部分整合到mysql中去,畢竟任務是有狀態的[原諒筆者太懶,一切從簡]

worker

這個是工具的核心,也就是上圖中的Node1,Node2,Node3;有多少個Node取決於你在多少個節點上啟動JVM程序,程式入口為

com.fan.merge.AppMain

這些Node同時消費Reids佇列中的訊息,根據訊息規則執行合併檔案;這裡需要注意的是合併的檔案會寫在資料的原始目錄,也就是小檔案所在的目錄;檔案合併之前會將資料備份一份到指定目錄/user/combine;檔案合併完後,會將原始目錄中的小檔案刪除;
問題:
1.如果其他使用者在目錄合併期間讀取此資料,可能會出現資料多讀的情況
2.目前沒有對資料備份目錄進行操作,比如備份多久的資料,畢竟合併資料不可能做到百分百執行成功,所以想做備份一份還是比較重要的,刪除策略這個筆者還沒時間去實現

  • 配置檔案如下
vi config.properties

redis.host=192.168.120.4
redis.port=6379

#filesize=128m
#檔案最後修改時間在20180101之後的會進行合併
start.time=20180101
#距離當前時間多久之前的資料(例如今天是20181015,結合start.time,10代表: 20180101~20181005之間的資料)
end.time=10

#儲存使用者傳遞目錄引數的Key
file.dir.key=FILEDIRKEY

#佇列名
quene.name=FILECOMBINEQUENE

#並行度
default.parallelism=3

#任務佇列大小
task.quene.size=10
tmp.dir=/user/combine


#佇列阻塞係數[取值0.0~1.0],預設為0.8,執行緒池執行緒數poolsize = cpu cores/(1-0.8)
blockage.coefficient = 0.5

再來看下每個Node具體元件
每個Node三部分:push thread,queue,thread pools
為什麼這裡又使用了queue[LinkedBlockingDeque],起初是直接消費redis中佇列的訊息,這樣有個問題,就是會出現大量的任務堆積在thread pools中佇列中,這裡執行緒池使用的是

ExecutorService service = Executors.newFixedThreadPool(poolSize);

執行緒池使用的連結串列無界佇列,消費redis中的佇列中的資料很快,然而真正去執行合併任務的執行緒是非常耗時,所有等待執行的任務都會存在佇列中,這樣做的話jvm程序一掛所有的任務都沒有了,所以這裡加了一個緩衝佇列,控制從redis消費訊息的速度,佇列大小由下面引數控制

task.quene.size=10

push thread 從redis中消費資料,獲得需要合併的目錄以及合併規則,並將訊息推送到queue中,如果queue中資料滿了,則push thread [Produce2Quene.java]就開始sleep,直到queue有剩餘空間存放訊息;接下來就是主執行緒將task提交到執行緒中去執行

到這裡,整體設計就就完成了,程式碼寫的比較倉促,難免有些bug,其是主要的合併主體邏輯實現為:

com.fan.merge.transform.OrcCombine
com.fan.merge.transform.ParquetCombine

大家可以圍繞這兩個實現類定製自己的合併規則

說一下自定義實現的一點小建議【這也是我沒有實現的】:

  • 任務狀態儲存,任務start,running,peding,success
  • 失敗任務重跑規則設計,重試次數
  • kerberos認證登陸

苦於csdn下東西都需要積分,積分要的不多[5分],所以多多理解,哈哈
https://download.csdn.net/download/woloqun/10750290

程式碼寫得倉促,有問題留言