BloomFilter在Hudi中的應用
Bloom Filter在Hudi中的應用
介紹
Bloom Filter可以用於檢索一個元素是否在一個集合中。它的優點是空間效率和查詢時間都遠遠超過一般的演算法,主要缺點是存在一定的誤判率:當其判斷元素存在時,實際上元素可能並不存在。而當判定不存在時,則元素一定不存在,Bloom Filter在對精確度要求不太嚴格的大資料量場景下運用十分廣泛。
引入
為何要引入Bloom Filter?這是Hudi為加快資料upsert採用的一種解決方案,即判斷record是否已經在檔案中存在,若存在,則更新,若不存在,則插入。對於upsert顯然無法容忍出現誤判,否則可能會出現應該插入和變成了更新的錯誤,那麼Hudi是如何解決誤判問題的呢?一種簡單辦法是當Bloom Filter判斷該元素存在時,再去檔案裡二次確認該元素是否真的存在;而當Bloom Filter判斷該元素不存在時,則無需讀檔案,通過二次確認的方法來規避Bloom Filter的誤判問題,實際上這也是Hudi採取的方案,值得一提的是,現在Delta暫時還不支援Bloom Filter,其判斷一條記錄是否存在是直接通過一次全表join來實現,效率比較低下。接下來我們來分析Bloom Filter在Hudi中的應用。
流程
Hudi從上游系統(Kafka、DFS等)消費一批資料後,會根據使用者配置的寫入模式(insert、upsert、bulkinsert)寫入Hudi資料集。而當配置為upsert時,意味著需要將資料插入更新至Hudi資料集,而第一步是需要標記哪些記錄已經存在,哪些記錄不存在,然後,對於存在的記錄進行更新,不存在記錄進行插入。
在HoodieWriteClient
中提供了對應三種寫入模式的方法(#insert、#upsert、#bulkinsert),對於使用了Bloom Filter的#upsert
方法而言,其核心原始碼如下
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) { ... // perform index loop up to get existing location of records JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table); ... return upsertRecordsInternal(taggedRecords, commitTime, table, true); }
可以看到首先利用索引
給記錄打標籤,然後再進行更新,下面主要分析打標籤的過程。
對於索引
,Hudi提供了四種索引方式的實現:HBaseIndex
、HoodieBloomIndex
、HoodieGlobalBloomIndex
、InMemoryHashIndex
,預設使用HoodieBloomIndex。其中HoodieGlobalBloomIndex與HoodieBloomIndex的區別是前者會讀取所有分割槽檔案,而後者只讀取記錄所存在的分割槽下的檔案。下面以HoodieBloomIndex為例進行分析。
HoodieBloomIndex#tagLocation
核心程式碼如下
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable) { // Step 0: cache the input record RDD if (config.getBloomIndexUseCaching()) { recordRDD.persist(config.getBloomIndexInputStorageLevel()); } // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey) JavaPairRDD<String, String> partitionRecordKeyPairRDD = recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey())); // Lookup indexes for all the partition/recordkey pair JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD = lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable); // Cache the result, for subsequent stages. if (config.getBloomIndexUseCaching()) { keyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER()); } // Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys // Cost: 4 sec. JavaRDD<HoodieRecord<T>> taggedRecordRDD = tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD); if (config.getBloomIndexUseCaching()) { recordRDD.unpersist(); // unpersist the input Record RDD keyFilenamePairRDD.unpersist(); } return taggedRecordRDD; }
該過程會快取記錄以便優化資料的載入。首先從記錄中解析出對應的分割槽路徑 -> key
,接著檢視索引,然後將位置資訊(存在於哪個檔案)回推到記錄中。
HoodieBloomIndex#lookup
核心程式碼如下
private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
JavaPairRDD<String, String> partitionRecordKeyPairRDD, final JavaSparkContext jsc,
final HoodieTable hoodieTable) {
// Obtain records per partition, in the incoming records
Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
// Step 2: Load all involved files as <Partition, filename> pairs
List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
loadInvolvedFiles(affectedPartitionPathList, jsc, hoodieTable);
final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));
// Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id,
// that contains it.
Map<String, Long> comparisonsPerFileGroup =
computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup);
int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism);
return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
comparisonsPerFileGroup);
}
該方法首先會計算出每個分割槽有多少條記錄和影響的分割槽有哪些,然後載入影響的分割槽的檔案,最後計算並行度後,開始找記錄真正存在的檔案。
對於#loadInvolvedFiles
方法而言,其會查詢指定分割槽分割槽下所有的資料檔案(parquet格式),並且如果開啟了hoodie.bloom.index.prune.by.ranges
,還會讀取檔案中的最小key和最大key(為加速後續的查詢)。
HoodieBloomIndex#findMatchingFilesForRecordKeys
核心程式碼如下
JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
Map<String, Long> fileGroupToComparisons) {
JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);
if (config.useBloomIndexBucketizedChecking()) {
Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,
config.getBloomIndexKeysPerBucket());
fileComparisonsRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
.repartitionAndSortWithinPartitions(partitioner).map(Tuple2::_2);
} else {
fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
}
return fileComparisonsRDD.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true)
.flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0)
.flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
.map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()),
new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())))
.collect(Collectors.toList()).iterator());
}
該方法首先會查詢記錄需要進行比對的檔案,然後再查詢的記錄的位置資訊。
其中,對於#explodeRecordRDDWithFileComparisons
方法而言,其會藉助樹/連結串列結構構造的檔案過濾器來加速記錄對應檔案的查詢(每個record可能會對應多個檔案)。
而使用Bloom Filter的核心邏輯承載在HoodieBloomIndexCheckFunction
,HoodieBloomIndexCheckFunction$LazyKeyCheckIterator該迭代器完成了記錄對應檔案的實際查詢過程,查詢的核心邏輯在
computeNext`中,其核心程式碼如下
protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {
List<HoodieKeyLookupHandle.KeyLookupResult> ret = new ArrayList<>();
try {
// process one file in each go.
while (inputItr.hasNext()) {
Tuple2<String, HoodieKey> currentTuple = inputItr.next();
String fileId = currentTuple._1;
String partitionPath = currentTuple._2.getPartitionPath();
String recordKey = currentTuple._2.getRecordKey();
Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
// lazily init state
if (keyLookupHandle == null) {
keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
}
// if continue on current file
if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
keyLookupHandle.addKey(recordKey);
} else {
// do the actual checking of file & break out
ret.add(keyLookupHandle.getLookupResult());
keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
keyLookupHandle.addKey(recordKey);
break;
}
}
// handle case, where we ran out of input, close pending work, update return val
if (!inputItr.hasNext()) {
ret.add(keyLookupHandle.getLookupResult());
}
} catch (Throwable e) {
if (e instanceof HoodieException) {
throw e;
}
throw new HoodieIndexException("Error checking bloom filter index. ", e);
}
return ret;
}
該方法每次迭代只會處理一個檔案,每次處理時都會生成HoodieKeyLookupHandle
,然後會新增recordKey
,處理完後再獲取查詢結果。
其中HoodieKeyLookupHandle#addKey
方法核心程式碼如下
public void addKey(String recordKey) {
// check record key against bloom filter of current file & add to possible keys if needed
if (bloomFilter.mightContain(recordKey)) {
...
candidateRecordKeys.add(recordKey);
}
totalKeysChecked++;
}
可以看到,這裡使用到了Bloom Filter來判斷該記錄是否存在,如果存在,則加入到候選佇列中,等待進一步判斷;若不存在,則無需額外處理,其中Bloom Filter會在建立HoodieKeyLookupHandle
例項時初始化(從指定檔案中讀取Bloom Filter)。
HoodieKeyLookupHandle#getLookupResult
方法核心程式碼如下
public KeyLookupResult getLookupResult() {
...
HoodieDataFile dataFile = getLatestDataFile();
List<String> matchingKeys =
checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
...
return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
dataFile.getCommitTime(), matchingKeys);
}
該方法首先獲取指定分割槽下的最新資料檔案,然後判斷資料檔案存在哪些recordKey
,並將其封裝進KeyLookupResult
後返回。其中#checkCandidatesAgainstFile
會讀取檔案中所有的recordKey
,判斷是否存在於candidateRecordKeys
,這便完成了進一步確認。
到這裡即完成了record存在於哪些檔案的所有查詢,查詢完後會進行進一步處理,後續再給出分析。
總結
Hudi引入Bloom Filter是為了加速upsert
過程,並將其存入parquet資料檔案中的Footer中,在讀取檔案時會從Footer中讀取該BloomFilter。在利用Bloom Filter來判斷記錄是否存在時,會採用二次確認的方式規避Bloom Filter的誤判問題。
相關推薦
dropzone拖動文件上傳在thinkphp5中應用一個實例
php dropzone參考:Dropzone的使用方法點擊查看dropzone中文文檔後臺用的INSPINIA框架的模板,裏面有,dropzone.jsdropzone是一個可以拖文件上傳的js.拖進去,就上傳了。我在頁面上,寫了一個保存已經上傳的文件的image3,image4.這就需要改寫一下dropz
MFC ocx IE中應用相關問題
如果 選項卡 原因 -h spa 問題 tom 刷新 cls 一、IE中調用控件時碰到過的問題 1、控件發消息讓IE退出 ::PostMessage(m_hwnd, WM_DESTROY, 0, 0); //m_hwnd為ie的窗口句柄 ::PostMes
Java 容器在實際web項目中應用
有用 找到 style view ram 知識 arc 的確 例子 前言:在java開發中我們離不開集合數組等,在java中有個專有名詞:“容器” ,下面會結合Thinking in Java的知識和實際開發中業務場景講述一下容器在Web項目中的用法。可結合圖片代碼了解Ja
(轉)基於MVC4+EasyUI的Web開發框架經驗總結(6)--在頁面中應用下拉列表的處理
ica new web開發 don ext images 如果 bob 獲取 http://www.cnblogs.com/wuhuacong/p/3840321.html 在很多Web界面中,我們都可以看到很多下拉列表的元素,有些是固定的,有些是動態的;有些是字典內容,
php中應用memcached
txt 主機 memcached nbsp configure pool lib php5 共享 PHP連接Memcached 先安裝php的memcache擴展 # wget http://ip/data/attachment/forum/memcache-2.2.3.
Java 數據類型在實際開發中應用二枚舉
項目 arraylist font 編譯器 tid null left join 基本 size 在實際編程中,往往存在著這樣的“數據集”,它們的數值在程序中是穩定的,而且“數據集”中的元素是有限的。在JDK1.5之前,人們用接口來描述這一種數據類型。 1.5以後引入枚
在C#代碼中應用Log4Net(三)Log4Net中配置文件的解釋
images rdquo files read 出現 插入 tof stat 日誌 <log4net> <!-- 錯誤日誌類--> <logger name="logerror"> <level value
深度學習在 CTR 中應用
核心 融合 輸出 -s 情況 ... 能夠 rec 數據 歡迎大家前往騰訊雲技術社區,獲取更多騰訊海量技術實踐幹貨哦~ 作者:高航 一. Wide&&Deep 模型 首先給出Wide && Deep [1] 網絡結構: 本質上是線性模
谷歌playstore中應用下載量已超150億次
.cn odi dex odm www wot mdi mdm odex peerjs%E5%AE%9E%E7%8E%B0%E6%B5%8F%E8%A7%88%E5%99%A8%E5%AF%B9%E7%AD%89%E8%BF%9E%E6%8E%A5%E7%9A%84js%E
Spring.net介紹及MVC中應用
text name default 轉變 業務層 核心 解耦 inf star Spring.net兩大核心內容: IOC(控制反轉) 傳統的面相對象思維模式是對象A依賴對象B,對象B的實例化和調用都在對象A中發生,一旦對象B中發生變化,對象A也要隨之變化,這樣使得
SQL Server 2005/2008/2012中應用分布式分區視圖
lag tails soft 9.png .aspx 並且 例如 () 根據 自2000版本起,SQL Server企業版中引入分布式分區視圖,允許你為分布在不同的SQL 實例的兩個或多個水平分區表創建視圖。 簡要步驟如下:根據Check約束中定義的一組值把大表分
高性能緩存系統Memcached在ASP.NET MVC中應用
index req 緩存系統 help add nts .... ont p s 首先下載windows平臺下的memcached,然後安裝。安裝完之後就是啟動memcached服務了,你可以在cmd下用dos命令輸入,也可以在計算機管理->服務->memcac
linux中應用程序的安裝與管理
模式 練習 fig 分享圖片 ctr watermark 選擇 str size 在基本的系統安裝完成之後,為了進一步提高服務器的易用性,我們就需要為其安裝一些用於服務器管理及桌面環境的應用程序。我們這裏選擇的是代碼包編譯的方式安裝zhcon中文虛擬控制臺。 這裏
Parquet性能測試之項目實踐中應用測試
count 測試結果 直接 存儲 1.2 只讀 求最大值 文件存儲 效率 因為從事大數據方面的工作,經常在操作過程中數據存儲占空間過大,讀取速率過慢等問題,我開始對parquet格式存儲進行了研究,下面是自己的一些見解(使用的表都是項目中的,大家理解為寬表即可): 一、Sp
Python中paramiko模塊在linux運維中應用
python linux 運維 python的paramiko模塊可以實現ssh客戶端的功能,使用起來也比較簡單。但是當服務器非常多的時候,每臺服務器上執行完全相同的簡單操作,也會花費大量的時間。 下載模塊:paramiko.tar.gz使用tar解壓後在cd到目錄下最後使用rpm安裝 rpm -ivh
Android中應用安裝分析
generate upgrade 版本 線程 title 回調函數 ebe children 應用商店 #1 安裝方式 1 安裝系統APK和預制APK時,通過PMS的構造函數中安裝,即第一次開機時安裝應用,沒有安裝界面。 2 網絡下載安裝,通過應用商店等,即調用Packa
IIS 之 在IIS7、IIS7.5中應用程序池最優配置方案
定期 target 註冊 enable 間隔 images pre .net 微軟 找到Web站點對應的應用程序池,“應用程序池” → 找到對應的“應用程序池” → 右鍵“高級設置...” 一、一般優化方案 1、基本設置 [1] 隊列長度: 默認值100
報表模板 — 在項目管理中應用數據報表分析
結構 學校 表設計 整體 資源 模板 font padding https 項目管理是在項目活動中運用專門的知識、技能、工具和方法,使項目能在有限資源下,實現或超過設定的需求和期望的過程,是對成功地達成一系列目標相關的活動的整體監測和管控。無論是在大型工程、軟件開發、系統制
在 js 中應用 訂閱釋出模式(subscrib/public)
什麼是釋出-訂閱者模式 我們在使用釋出-訂閱者模式之前,先了解什麼是釋出-訂閱者模式。簡單來說,釋出訂閱者模式就是一種一對多的依賴關係。多個訂閱者(一般是註冊的函式)同時監聽同一個資料物件,當這個資料物件發生變化的時候會執行一個釋出事件,通過這個釋出事件會通知到所有的訂閱者,使它
composer laravel中應用
1:composer安裝好laravel框架進行裝包操作 先更新作曲家命令作曲家更新 在composer.json中加入國內映象源 “repositories” : { “packagist” : { “type” :“composer” , “url” :“