1. 程式人生 > >Flume效能優化實踐

Flume效能優化實踐

Flume效能優化實踐

發表評論

最近公司落地Flume日誌採集著實反覆了好久,簡單記錄一下效能優化的核心思路。

初始配置所有batch size、transaction size都是1000,channel的capactiy是10000。

版本一

最初我是按Memory Channel做壓測,Taildir的source採集增量日誌,Memory Channel緩衝資料,Kafka Sink傳送資料。

這裡面的瓶頸是Kafka Sink,因為Kafka Sink是單執行緒同步傳送,網路延遲就會導致吞吐上不去,大概10MB+的一個吞吐就封頂了。

版本二

翻看了官方文件,打算試驗一下sink group來實現多個kafka sink同時傳送,結果效能仍舊10MB+。

分析原理,原來sink group仍舊是個單執行緒sink,相當於多個kafka sink的代理而已,僅僅實現了輪轉負載均衡功能。

一個kafka sink的傳送延遲高,輪轉壓根沒有意義。

版本三

於是琢磨如何實現多執行緒跑多個Kafka Sink,於是仍舊使用1個Memory Channel,配置對應3個Kafka Sink,結果頻寬可以升高到30MB的樣子,但是極不穩定,來回跳躍。

此時發現Memory Channel的填充率接近90%+,應該是因為容量經常塞滿導致的流水線阻塞,通過增加memory channel的capacity到10萬,batch size和transaction size增加到1萬,吞吐提升到60MB~80MB+,填充率小於10%,已經滿足需求。

在transaction size=1000的情況下memory channel被填滿,而transaction size=1萬的情況下memory channel就不會被填滿,其實是通過增加channel批處理的包大小,降低了channel訪問的頻次,解決的是memory channel的鎖瓶頸。

同時,這個優化思路也帶來了問題,更大的memory channel capacity帶來了更大的資料丟失風險,因為宕機時memory channel裡緩衝的資料都會丟失。

版本四

實現多個memory channel輪轉,每個memory channel由一個kafka sink消費。

這樣做目的有2個:

  1. 由多個sink競爭消費1個channel改為各自消費1個channel,鎖瓶頸解決。
  2. 因為鎖瓶頸變小,所以可以仍舊保持較小的channel capacity來保障資料可靠性,比如每個channel容量10000,那麼3個channel丟失3萬,仍舊優於”版本三”。

實現該功能需要自己開發channel selector外掛,實現source流量的輪轉分發,可以翻看我之前寫的部落格。

版本五

同事要求使用file channel,保障佇列中資料的可靠性,但是經過測試發現吞吐只能跑到10MB左右,上述所說優化手段均無效。

更換SSD盤也沒有帶來任何提升,File channel自身填充率極低。

個人懷疑瓶頸在File Channel自身,其事務的提交效率太低,阻塞了source的投遞動作,無論如何增加channel數量也無濟於事,因為source是單執行緒的,輪轉發往多個File Channel的速度仍舊等於單個File Channel速度,導致後續Sink沒有足夠資料消費,吞吐無法提升。

從FileChannel程式碼來看,磁碟讀寫的相關程式碼全部被加鎖處理:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

    synchronized FlumeEventPointer put(ByteBuffer buffer) throws IOException {

      if (encryptor != null) {

        buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));

      }

      Pair<Integer, Integer> pair = write(buffer);

      return new FlumeEventPointer(pair.getLeft(), pair.getRight());

    }

 

    synchronized void take(ByteBuffer buffer) throws IOException {

      if (encryptor != null) {

        buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));

      }

      write(buffer);

    }

 

    synchronized void rollback(ByteBuffer buffer) throws IOException {

      if (encryptor != null) {

        buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));

      }

      write(buffer);

    }

 

    synchronized void commit(ByteBuffer buffer) throws IOException {

      if (encryptor != null) {

        buffer = ByteBuffer.wrap(encryptor.encrypt(buffer.array()));

      }

      write(buffer);

      dirty = true;

      lastCommitPosition = position();

    }

另外,日誌檔案的sync刷盤策略分為兩種選項,一種是每次提交事務都重新整理,另外一個是定時執行緒重新整理(下面是定時執行緒):

 

1

2

3

4

5

6

7

8

9

10

11

        syncExecutor.scheduleWithFixedDelay(new Runnable() {

          @Override

          public void run() {

            try {

              sync();

            } catch (Throwable ex) {

              LOG.error("Data file, " + getFile().toString() + " could not " +

                  "be synced to disk due to an error.", ex);

            }

          }

        }, fsyncInterval, fsyncInterval, TimeUnit.SECONDS);

而這個sync()刷盤操作同樣被鎖保護的,會佔用大量的鎖時間:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

    /**

     * Sync the underlying log file to disk. Expensive call,

     * should be used only on commits. If a sync has already happened after

     * the last commit, this method is a no-op

     *

     * @throws IOException

     * @throws LogFileRetryableIOException - if this log file is closed.

     */

    synchronized void sync() throws IOException {

      if (!fsyncPerTransaction && !dirty) {

        if (LOG.isDebugEnabled()) {

          LOG.debug(

              "No events written to file, " + getFile().toString() +

                  " in last " + fsyncInterval + " or since last commit.");

        }

        return;

      }

      if (!isOpen()) {

        throw new LogFileRetryableIOException("File closed " + file);

      }

      if (lastSyncPosition < lastCommitPosition) {

        getFileChannel().force(false);

        lastSyncPosition = position();

        syncCount++;

        dirty = false;

      }

    }

降低sync()的呼叫頻率,理論上可以降低鎖佔用時間,讓出更多的鎖時間給put與take操作。

flume可以配置這些引數,只是官方文件裡並沒有說明:

 

1

2

3

4

5

  public static final String FSYNC_PER_TXN = "fsyncPerTransaction";

  public static final boolean DEFAULT_FSYNC_PRE_TXN = true;

 

  public static final String FSYNC_INTERVAL = "fsyncInterval";

  public static final int DEFAULT_FSYNC_INTERVAL = 5; // seconds.

預設是每個事務都sync,這樣當然是為了保障資料可靠性,否則也就沒必要用FileChannel了。

我嘗試改成了定時sync(),發現吞吐仍舊無法提升,那麼我繼續猜測問題在於事務的commit部分,也就是Sink做的事情:

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

/**

   * Synchronization not required as this method is atomic

   *

   * @param transactionID

   * @param type

   * @throws IOException

   */

  private void commit(long transactionID, short type) throws IOException {

    Preconditions.checkState(open, "Log is closed");

    Commit commit = new Commit(transactionID, WriteOrderOracle.next(), type);

    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(commit);

    int logFileIndex = nextLogWriter(transactionID);

    long usableSpace = logFiles.get(logFileIndex).getUsableSpace();

    long requiredSpace = minimumRequiredSpace + buffer.limit();

    if (usableSpace <= requiredSpace) {

      throw new IOException("Usable space exhausted, only " + usableSpace +

          " bytes remaining, required " + requiredSpace + " bytes");

    }

    boolean error = true;

    try {

      try {

        LogFile.Writer logFileWriter = logFiles.get(logFileIndex);

        // If multiple transactions are committing at the same time,

        // this ensures that the number of actual fsyncs is small and a

        // number of them are grouped together into one.

        logFileWriter.commit(buffer);

        logFileWriter.sync();

        error = false;

      } catch (LogFileRetryableIOException e) {

        if (!open) {

          throw e;

        }

        roll(logFileIndex, buffer);

        LogFile.Writer logFileWriter = logFiles.get(logFileIndex);

        logFileWriter.commit(buffer);

        logFileWriter.sync();

        error = false;

      }

提交事務也只是寫入一條日誌標記對應的事務完結了,這樣宕機重放日誌時就會跳過該事務。

我們發現這個操作總是sync(),雖然這個操作不需要鎖保護的樣子,但是它佔用了sink執行緒的時間,估計吞吐無法提升也離不開它的關係。

關於File Channel瓶頸,有同學有JAVA調優經驗的可以具體給FileChannel加一些除錯日誌,看看到底慢在哪個環節。

我個人會優先選擇使用capacity較小(1萬-10萬)的的memory channel配合多個sink來實現高吞吐,至於對宕機的那點擔心實在沒有必要,因為大多數時候memory channel的填充率不足1%,也就是丟失10萬*0.01=100條而已。