1. 程式人生 > >BloomFilter在Hudi中的應用

BloomFilter在Hudi中的應用

Bloom Filter在Hudi中的應用

介紹

Bloom Filter可以用於檢索一個元素是否在一個集合中。它的優點是空間效率和查詢時間都遠遠超過一般的演算法,主要缺點是存在一定的誤判率:當其判斷元素存在時,實際上元素可能並不存在。而當判定不存在時,則元素一定不存在,Bloom Filter在對精確度要求不太嚴格的大資料量場景下運用十分廣泛。

引入

為何要引入Bloom Filter?這是Hudi為加快資料upsert採用的一種解決方案,即判斷record是否已經在檔案中存在,若存在,則更新,若不存在,則插入。對於upsert顯然無法容忍出現誤判,否則可能會出現應該插入和變成了更新的錯誤,那麼Hudi是如何解決誤判問題的呢?一種簡單辦法是當Bloom Filter判斷該元素存在時,再去檔案裡二次確認該元素是否真的存在;而當Bloom Filter判斷該元素不存在時,則無需讀檔案,通過二次確認的方法來規避Bloom Filter的誤判問題,實際上這也是Hudi採取的方案,值得一提的是,現在Delta暫時還不支援Bloom Filter,其判斷一條記錄是否存在是直接通過一次全表join來實現,效率比較低下。接下來我們來分析Bloom Filter在Hudi中的應用。

流程

Hudi從上游系統(Kafka、DFS等)消費一批資料後,會根據使用者配置的寫入模式(insert、upsert、bulkinsert)寫入Hudi資料集。而當配置為upsert時,意味著需要將資料插入更新至Hudi資料集,而第一步是需要標記哪些記錄已經存在,哪些記錄不存在,然後,對於存在的記錄進行更新,不存在記錄進行插入。

HoodieWriteClient中提供了對應三種寫入模式的方法(#insert、#upsert、#bulkinsert),對於使用了Bloom Filter的#upsert方法而言,其核心原始碼如下

public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
    ...
    // perform index loop up to get existing location of records
    JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
    ...
    return upsertRecordsInternal(taggedRecords, commitTime, table, true);
}

可以看到首先利用索引給記錄打標籤,然後再進行更新,下面主要分析打標籤的過程。

對於索引,Hudi提供了四種索引方式的實現:HBaseIndexHoodieBloomIndexHoodieGlobalBloomIndexInMemoryHashIndex,預設使用HoodieBloomIndex。其中HoodieGlobalBloomIndex與HoodieBloomIndex的區別是前者會讀取所有分割槽檔案,而後者只讀取記錄所存在的分割槽下的檔案。下面以HoodieBloomIndex為例進行分析。

HoodieBloomIndex#tagLocation核心程式碼如下

public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
      HoodieTable<T> hoodieTable) {

    // Step 0: cache the input record RDD
    if (config.getBloomIndexUseCaching()) {
      recordRDD.persist(config.getBloomIndexInputStorageLevel());
    }

    // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
    JavaPairRDD<String, String> partitionRecordKeyPairRDD =
        recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey()));

    // Lookup indexes for all the partition/recordkey pair
    JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD =
        lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);

    // Cache the result, for subsequent stages.
    if (config.getBloomIndexUseCaching()) {
      keyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
    }

    // Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys
    // Cost: 4 sec.
    JavaRDD<HoodieRecord<T>> taggedRecordRDD = tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD);

    if (config.getBloomIndexUseCaching()) {
      recordRDD.unpersist(); // unpersist the input Record RDD
      keyFilenamePairRDD.unpersist();
    }

    return taggedRecordRDD;
  }

該過程會快取記錄以便優化資料的載入。首先從記錄中解析出對應的分割槽路徑 -> key,接著檢視索引,然後將位置資訊(存在於哪個檔案)回推到記錄中。

HoodieBloomIndex#lookup核心程式碼如下

private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
      JavaPairRDD<String, String> partitionRecordKeyPairRDD, final JavaSparkContext jsc,
      final HoodieTable hoodieTable) {
    // Obtain records per partition, in the incoming records
    Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
    List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());

    // Step 2: Load all involved files as <Partition, filename> pairs
    List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
        loadInvolvedFiles(affectedPartitionPathList, jsc, hoodieTable);
    final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
        fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));

    // Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id,
    // that contains it.
    Map<String, Long> comparisonsPerFileGroup =
        computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
    int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup);
    int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism);
    return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
        comparisonsPerFileGroup);
  }

該方法首先會計算出每個分割槽有多少條記錄和影響的分割槽有哪些,然後載入影響的分割槽的檔案,最後計算並行度後,開始找記錄真正存在的檔案。

對於#loadInvolvedFiles方法而言,其會查詢指定分割槽分割槽下所有的資料檔案(parquet格式),並且如果開啟了hoodie.bloom.index.prune.by.ranges,還會讀取檔案中的最小key和最大key(為加速後續的查詢)。

HoodieBloomIndex#findMatchingFilesForRecordKeys核心程式碼如下

JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
      final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
      JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
      Map<String, Long> fileGroupToComparisons) {
    JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
        explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);

    if (config.useBloomIndexBucketizedChecking()) {
      Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,
          config.getBloomIndexKeysPerBucket());

      fileComparisonsRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
          .repartitionAndSortWithinPartitions(partitioner).map(Tuple2::_2);
    } else {
      fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
    }

    return fileComparisonsRDD.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true)
        .flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0)
        .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
            .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()),
                new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())))
            .collect(Collectors.toList()).iterator());
  }

該方法首先會查詢記錄需要進行比對的檔案,然後再查詢的記錄的位置資訊。

其中,對於#explodeRecordRDDWithFileComparisons方法而言,其會藉助樹/連結串列結構構造的檔案過濾器來加速記錄對應檔案的查詢(每個record可能會對應多個檔案)。

而使用Bloom Filter的核心邏輯承載在HoodieBloomIndexCheckFunction,HoodieBloomIndexCheckFunction$LazyKeyCheckIterator該迭代器完成了記錄對應檔案的實際查詢過程,查詢的核心邏輯在computeNext`中,其核心程式碼如下

protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {

      List<HoodieKeyLookupHandle.KeyLookupResult> ret = new ArrayList<>();
      try {
        // process one file in each go.
        while (inputItr.hasNext()) {
          Tuple2<String, HoodieKey> currentTuple = inputItr.next();
          String fileId = currentTuple._1;
          String partitionPath = currentTuple._2.getPartitionPath();
          String recordKey = currentTuple._2.getRecordKey();
          Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);

          // lazily init state
          if (keyLookupHandle == null) {
            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
          }

          // if continue on current file
          if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
            keyLookupHandle.addKey(recordKey);
          } else {
            // do the actual checking of file & break out
            ret.add(keyLookupHandle.getLookupResult());
            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
            keyLookupHandle.addKey(recordKey);
            break;
          }
        }

        // handle case, where we ran out of input, close pending work, update return val
        if (!inputItr.hasNext()) {
          ret.add(keyLookupHandle.getLookupResult());
        }
      } catch (Throwable e) {
        if (e instanceof HoodieException) {
          throw e;
        }
        throw new HoodieIndexException("Error checking bloom filter index. ", e);
      }

      return ret;
    }

該方法每次迭代只會處理一個檔案,每次處理時都會生成HoodieKeyLookupHandle,然後會新增recordKey,處理完後再獲取查詢結果。

其中HoodieKeyLookupHandle#addKey方法核心程式碼如下

public void addKey(String recordKey) {
    // check record key against bloom filter of current file & add to possible keys if needed
    if (bloomFilter.mightContain(recordKey)) {
      ...
      candidateRecordKeys.add(recordKey);
    }
    totalKeysChecked++;
  }

可以看到,這裡使用到了Bloom Filter來判斷該記錄是否存在,如果存在,則加入到候選佇列中,等待進一步判斷;若不存在,則無需額外處理,其中Bloom Filter會在建立HoodieKeyLookupHandle例項時初始化(從指定檔案中讀取Bloom Filter)。

HoodieKeyLookupHandle#getLookupResult方法核心程式碼如下

public KeyLookupResult getLookupResult() {
    ...
    HoodieDataFile dataFile = getLatestDataFile();
    List<String> matchingKeys =
        checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
    ...
    return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
        dataFile.getCommitTime(), matchingKeys);
  }

該方法首先獲取指定分割槽下的最新資料檔案,然後判斷資料檔案存在哪些recordKey,並將其封裝進KeyLookupResult後返回。其中#checkCandidatesAgainstFile會讀取檔案中所有的recordKey,判斷是否存在於candidateRecordKeys,這便完成了進一步確認。

到這裡即完成了record存在於哪些檔案的所有查詢,查詢完後會進行進一步處理,後續再給出分析。

總結

Hudi引入Bloom Filter是為了加速upsert過程,並將其存入parquet資料檔案中的Footer中,在讀取檔案時會從Footer中讀取該BloomFilter。在利用Bloom Filter來判斷記錄是否存在時,會採用二次確認的方式規避Bloom Filter的誤判問題。

相關推薦

dropzone拖動文件上傳在thinkphp5應用一個實例

php dropzone參考:Dropzone的使用方法點擊查看dropzone中文文檔後臺用的INSPINIA框架的模板,裏面有,dropzone.jsdropzone是一個可以拖文件上傳的js.拖進去,就上傳了。我在頁面上,寫了一個保存已經上傳的文件的image3,image4.這就需要改寫一下dropz

MFC ocx IE應用相關問題

如果 選項卡 原因 -h spa 問題 tom 刷新 cls 一、IE中調用控件時碰到過的問題 1、控件發消息讓IE退出 ::PostMessage(m_hwnd, WM_DESTROY, 0, 0);      //m_hwnd為ie的窗口句柄 ::PostMes

Java 容器在實際web項目應用

有用 找到 style view ram 知識 arc 的確 例子 前言:在java開發中我們離不開集合數組等,在java中有個專有名詞:“容器” ,下面會結合Thinking in Java的知識和實際開發中業務場景講述一下容器在Web項目中的用法。可結合圖片代碼了解Ja

(轉)基於MVC4+EasyUI的Web開發框架經驗總結(6)--在頁面應用下拉列表的處理

ica new web開發 don ext images 如果 bob 獲取 http://www.cnblogs.com/wuhuacong/p/3840321.html 在很多Web界面中,我們都可以看到很多下拉列表的元素,有些是固定的,有些是動態的;有些是字典內容,

php應用memcached

txt 主機 memcached nbsp configure pool lib php5 共享 PHP連接Memcached 先安裝php的memcache擴展 # wget http://ip/data/attachment/forum/memcache-2.2.3.

Java 數據類型在實際開發應用二枚舉

項目 arraylist font 編譯器 tid null left join 基本 size   在實際編程中,往往存在著這樣的“數據集”,它們的數值在程序中是穩定的,而且“數據集”中的元素是有限的。在JDK1.5之前,人們用接口來描述這一種數據類型。 1.5以後引入枚

在C#代碼應用Log4Net(三)Log4Net配置文件的解釋

images rdquo files read 出現 插入 tof stat 日誌 <log4net> <!-- 錯誤日誌類--> <logger name="logerror"> <level value

深度學習在 CTR 應用

核心 融合 輸出 -s 情況 ... 能夠 rec 數據 歡迎大家前往騰訊雲技術社區,獲取更多騰訊海量技術實踐幹貨哦~ 作者:高航 一. Wide&&Deep 模型 首先給出Wide && Deep [1] 網絡結構: 本質上是線性模

谷歌playstore應用下載量已超150億次

.cn odi dex odm www wot mdi mdm odex peerjs%E5%AE%9E%E7%8E%B0%E6%B5%8F%E8%A7%88%E5%99%A8%E5%AF%B9%E7%AD%89%E8%BF%9E%E6%8E%A5%E7%9A%84js%E

Spring.net介紹及MVC應用

text name default 轉變 業務層 核心 解耦 inf star Spring.net兩大核心內容: IOC(控制反轉) 傳統的面相對象思維模式是對象A依賴對象B,對象B的實例化和調用都在對象A中發生,一旦對象B中發生變化,對象A也要隨之變化,這樣使得

SQL Server 2005/2008/2012應用分布式分區視圖

lag tails soft 9.png .aspx 並且 例如 () 根據   自2000版本起,SQL Server企業版中引入分布式分區視圖,允許你為分布在不同的SQL 實例的兩個或多個水平分區表創建視圖。   簡要步驟如下:根據Check約束中定義的一組值把大表分

高性能緩存系統Memcached在ASP.NET MVC應用

index req 緩存系統 help add nts .... ont p s 首先下載windows平臺下的memcached,然後安裝。安裝完之後就是啟動memcached服務了,你可以在cmd下用dos命令輸入,也可以在計算機管理->服務->memcac

linux應用程序的安裝與管理

模式 練習 fig 分享圖片 ctr watermark 選擇 str size 在基本的系統安裝完成之後,為了進一步提高服務器的易用性,我們就需要為其安裝一些用於服務器管理及桌面環境的應用程序。我們這裏選擇的是代碼包編譯的方式安裝zhcon中文虛擬控制臺。 這裏

Parquet性能測試之項目實踐應用測試

count 測試結果 直接 存儲 1.2 只讀 求最大值 文件存儲 效率 因為從事大數據方面的工作,經常在操作過程中數據存儲占空間過大,讀取速率過慢等問題,我開始對parquet格式存儲進行了研究,下面是自己的一些見解(使用的表都是項目中的,大家理解為寬表即可): 一、Sp

Pythonparamiko模塊在linux運維應用

python linux 運維 python的paramiko模塊可以實現ssh客戶端的功能,使用起來也比較簡單。但是當服務器非常多的時候,每臺服務器上執行完全相同的簡單操作,也會花費大量的時間。 下載模塊:paramiko.tar.gz使用tar解壓後在cd到目錄下最後使用rpm安裝 rpm -ivh

Android應用安裝分析

generate upgrade 版本 線程 title 回調函數 ebe children 應用商店 #1 安裝方式 1 安裝系統APK和預制APK時,通過PMS的構造函數中安裝,即第一次開機時安裝應用,沒有安裝界面。 2 網絡下載安裝,通過應用商店等,即調用Packa

IIS 之 在IIS7、IIS7.5應用程序池最優配置方案

定期 target 註冊 enable 間隔 images pre .net 微軟   找到Web站點對應的應用程序池,“應用程序池” → 找到對應的“應用程序池” → 右鍵“高級設置...”    一、一般優化方案   1、基本設置   [1] 隊列長度: 默認值100

報表模板 — 在項目管理應用數據報表分析

結構 學校 表設計 整體 資源 模板 font padding https 項目管理是在項目活動中運用專門的知識、技能、工具和方法,使項目能在有限資源下,實現或超過設定的需求和期望的過程,是對成功地達成一系列目標相關的活動的整體監測和管控。無論是在大型工程、軟件開發、系統制

在 js 應用 訂閱釋出模式(subscrib/public)

什麼是釋出-訂閱者模式 我們在使用釋出-訂閱者模式之前,先了解什麼是釋出-訂閱者模式。簡單來說,釋出訂閱者模式就是一種一對多的依賴關係。多個訂閱者(一般是註冊的函式)同時監聽同一個資料物件,當這個資料物件發生變化的時候會執行一個釋出事件,通過這個釋出事件會通知到所有的訂閱者,使它

composer laravel應用

1:composer安裝好laravel框架進行裝包操作 先更新作曲家命令作曲家更新 在composer.json中加入國內映象源 “repositories” : { “packagist” : { “type” :“composer” , “url” :“