1. 程式人生 > >生產者-消費者模型在Hudi中的應用

生產者-消費者模型在Hudi中的應用

介紹

生產者-消費者模型用於解耦生產者與消費者,平衡兩者之間的能力不平衡,該模型廣泛應用於各個系統中,Hudi也使用了該模型控制對記錄的處理,即記錄會被生產者生產至佇列中,然後由消費者從佇列中消費,更具體一點,對於更新操作,生產者會將檔案中老的記錄放入佇列中等待消費者消費,消費後交由HoodieMergeHandle處理;對於插入操作,生產者會將新記錄放入佇列中等待消費者消費,消費後交由HandleCreateHandle處理。

入口

前面的文章中提到過無論是HoodieCopyOnWriteTable#handleUpdate處理更新時直接生成了一個SparkBoundedInMemoryExecutor

物件,還是HoodieCopyOnWriteTable#handleInsert處理插入時生成了一個CopyOnWriteLazyInsertIterable物件,再迭代時呼叫該物件的CopyOnWriteLazyInsertIterable#computeNext方法生成SparkBoundedInMemoryExecutor物件。最後兩者均會呼叫SparkBoundedInMemoryExecutor#execute開始記錄的處理,該方法核心程式碼如下

  public E execute() {
    try {
      ExecutorCompletionService<Boolean> producerService = startProducers();
      Future<E> future = startConsumer();
      // Wait for consumer to be done
      return future.get();
    } catch (Exception e) {
      throw new HoodieException(e);
    }
  }

該方法會啟動所有生產者和單個消費者進行處理。

Hudi定義了BoundedInMemoryQueueProducer介面表示生產者,其子類實現如下

  • FunctionBasedQueueProducer,基於Function來生產記錄,在合併日誌log檔案和資料parquet檔案時使用,以便提供RealTimeView
  • IteratorBasedQueueProducer,基於迭代器來生產記錄,在插入更新時使用。

定義了BoundedInMemoryQueueConsumer類表示消費者,其主要子類實現如下

  • CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler,主要處理CopyOnWrite
    表型別時的插入。
    • MergeOnReadLazyInsertIterable$MergeOnReadInsertHandler,主要處理MergeOnRead

表型別時的插入,其為CopyOnWriteInsertHandler的子類。

  • CopyOnWriteLazyInsertIterable$UpdateHandler,主要處理CopyOnWrite表型別時的更新。

整個生產消費相關的類繼承結構非常清晰。

對於生產者的啟動,startProducers方法核心程式碼如下

  public ExecutorCompletionService<Boolean> startProducers() {
    // Latch to control when and which producer thread will close the queue
    final CountDownLatch latch = new CountDownLatch(producers.size());
    final ExecutorCompletionService<Boolean> completionService =
        new ExecutorCompletionService<Boolean>(executorService);
    producers.stream().map(producer -> {
      return completionService.submit(() -> {
        try {
          preExecute();
          producer.produce(queue);
        } catch (Exception e) {
          logger.error("error producing records", e);
          queue.markAsFailed(e);
          throw e;
        } finally {
          synchronized (latch) {
            latch.countDown();
            if (latch.getCount() == 0) {
              // Mark production as done so that consumer will be able to exit
              queue.close();
            }
          }
        }
        return true;
      });
    }).collect(Collectors.toList());
    return completionService;
  }

該方法使用CountDownLatch來協調生產者執行緒與消費者執行緒的退出動作,然後呼叫produce方法開始生產,對於插入更新時的IteratorBasedQueueProducer而言,其核心程式碼如下

  public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
    ...
    while (inputIterator.hasNext()) {
      queue.insertRecord(inputIterator.next());
    }
    ...
  }

可以看到只要迭代器還有記錄(可能為插入時的新記錄或者更新時的舊記錄),就會往佇列中不斷寫入。

對於消費者的啟動,startConsumer方法的核心程式碼如下

  private Future<E> startConsumer() {
    return consumer.map(consumer -> {
      return executorService.submit(() -> {
        ...
        preExecute();
        try {
          E result = consumer.consume(queue);
          return result;
        } catch (Exception e) {
          queue.markAsFailed(e);
          throw e;
        }
      });
    }).orElse(CompletableFuture.completedFuture(null));
  }

消費時會先進行執行前的準備,然後開始消費,其中consume方法的核心程式碼如下

  public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
    Iterator<I> iterator = queue.iterator();

    while (iterator.hasNext()) {
      consumeOneRecord(iterator.next());
    }

    // Notifies done
    finish();

    return getResult();
  }

可以看到只要佇列中還有記錄,就可以獲取該記錄,然後呼叫不同BoundedInMemoryQueueConsumer子類的consumeOneRecord進行更新插入處理。

值得一提的是Hudi對佇列進行了流控,生產者不能無限制地將記錄寫入佇列中,佇列快取的大小由使用者配置,佇列能放入記錄的條數由取樣的記錄大小和佇列快取大小控制。

在生產時,會呼叫BoundedInMemoryQueue#insertRecord將記錄寫入佇列,其核心程式碼如下

  public void insertRecord(I t) throws Exception {
    ...
    rateLimiter.acquire();
    // We are retrieving insert value in the record queueing thread to offload computation
    // around schema validation
    // and record creation to it.
    final O payload = transformFunction.apply(t);
    adjustBufferSizeIfNeeded(payload);
    queue.put(Option.of(payload));
  }

首先獲取一個許可(Semaphore),未成功獲取會被阻塞直至成功獲取,然後獲取記錄的負載以便調整佇列,然後放入內部佇列(LinkedBlockingQueue)中,其中adjustBufferSizeIfNeeded方法的核心程式碼如下

  private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
    if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
      return;
    }

    final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
    final long newAvgRecordSizeInBytes =
        Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
    final int newRateLimit =
        (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));

    // If there is any change in number of records to cache then we will either release (if it increased) or acquire
    // (if it decreased) to adjust rate limiting to newly computed value.
    if (newRateLimit > currentRateLimit) {
      rateLimiter.release(newRateLimit - currentRateLimit);
    } else if (newRateLimit < currentRateLimit) {
      rateLimiter.acquire(currentRateLimit - newRateLimit);
    }
    currentRateLimit = newRateLimit;
    avgRecordSizeInBytes = newAvgRecordSizeInBytes;
    numSamples++;
  }

首先看是否已經達到取樣頻率,然後計算新的記錄平均大小和限流速率,如果新的限流速率大於當前速率,則可釋放一些許可(供阻塞的生產者獲取後繼續生產),否則需要獲取(回收)一些許可(許可變少後生產速率自然就降低了)。該操作可根據取樣的記錄大小動態調節速率,不至於在記錄負載太大和記錄負載太小時,放入同等個數,從而起到動態調節作用。

在消費時,會呼叫BoundedInMemoryQueue#readNextRecord讀取記錄,其核心程式碼如下

  private Option<O> readNextRecord() {
    ...
    rateLimiter.release();
    Option<O> newRecord = Option.empty();
    while (expectMoreRecords()) {
      try {
        throwExceptionIfFailed();
        newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
        if (newRecord != null) {
          break;
        }
      } catch (InterruptedException e) {
        throw new HoodieException(e);
      }
    }
    ...

    if (newRecord != null && newRecord.isPresent()) {
      return newRecord;
    } else {
      // We are done reading all the records from internal iterator.
      this.isReadDone.set(true);
      return Option.empty();
    }
  }

可以看到首先會釋放一個許可,然後判斷是否還可以讀取記錄(還在生產或者停止生產但佇列不為空都可讀取),然後從內部佇列獲取記錄或返回。

上述便是生產者-消費者在Hudi中應用的分析。

總結

Hudi採用了生產者-消費者模型來控制記錄的處理,與傳統多生產者-多消費者模型不同的是,Hudi現在只支援多生產者-單消費者模型,單消費者意味著Hudi暫時不支援檔案的併發寫入。而對於生產消費的佇列的實現,Hudi並未僅僅只是基於LinkedBlockingQueue,而是採用了更精細化的速率控制,保證速率會隨著記錄負載大小的變化和配置的佇列快取大小而動態變化,這也降低了系統發生OOM的概率。

相關推薦

no