1. 程式人生 > >Flume 長時間執行,Hive Sink 停止工作問題處理

Flume 長時間執行,Hive Sink 停止工作問題處理

1. 問題現象

在使用 Flume 將資料從 Kafka 載入到 hive 的過程中,我們遇到一個問題:每天晚上 Flume 的 Hive Sink 總會報錯,然後停止工作:

15 juil. 2016 21:40:43,008 INFO [hive-hive1-call-runner-0] (org.apache.flume.sink.hive.HiveWriter$2.call:238) - Sending heartbeat on batch TxnIds=[3755...3764] on endPoint = {metaStoreUri=...
15 juil. 2016 22:12:21,001 INFO 

[hive-hive1-call-runner-0] (org.apache.flume.sink.hive.HiveWriter$2.call:231) - Sending heartbeat on batch TxnIds=[3785...3794] on endPoint = {metaStoreUri=...

15 juil. 2016 22:27:56,963 INFO [hive-hive1-call-runner-0] (org.apache.flume.sink.hive.HiveWriter$2.call:231) - Sending heartbeat on batch TxnIds=

[3795...3804] on endPoint = {metaStoreUri=...

這個問題的神奇之處在於,我們第二天重啟 Flume,又能一直正常工作,直到第二天晚上...

2. 問題分析

從搜尋中,我們發現其他人也遇到了同樣的問題:https://issues.apache.org/jira/browse/FLUME-2956 ,而且目前還無解,看來要找現成的解決方案不太可能了,只能從原始碼入手。

在 Flume 的 Hive Sink 原始碼中,我們看到如下問題:

a. 異常捕獲不充分(HiveSink.java:321);另外,在異常處理中還可能丟擲異常()

這裡只捕獲了HiveWriter.Failure,而實際情況中丟擲的異常是從 RuntimeException 繼承來的,也就直接丟擲去了;所以後面的補救措施:

    } catch (HiveWriter.Failure e) {
      // in case of error we close all TxnBatches to start clean next time
      LOG.warn(getName() + " : " + e.getMessage(), e);
      abortAllWriters();
      closeAllWriters();
      throw e;

    }

沒有得到執行。

另外由於 Hive Stream API 為了簡化程式碼,將異常從 RuntimeException 繼承來,導致這裡的開發人員忘了捕獲 abortAllWriters() 和 closeAllWriters() 中可能丟擲的異常。

b. 傳送 heartbeat 的工作在 HiveWriter.flush (HiveWriter.java:190) 中進行,但是這個函式只在剛剛傳送資料的 activeWriters 上呼叫

這個有問題。因為 hive writer 是重複使用的,如果很長一段時間後再來資料,那麼在這段中斷的時間內,伺服器(metastore) 就會關掉這個連線。然後客戶端 (hive Sink) 再次傳送 heartbeat 就會發生我們看到的現象。

3. 問題處理

從以上分析可以看到,我們需要進行的修補工作包括:

a. 在 (HiveSink.java:321) 中捕獲其他異常, 並捕獲 abortAllWriters() 和 closeAllWriters() 中可能丟擲的異常,保證充分丟棄並關閉所有 HiveWriter

b. 在空閒的 HiveWriter 上傳送 heartbeat;並且及時清理空閒的 HiveWriter。

詳細修改請參考 https://github.com/apache/flume/pull/206 。

經測試,以上處理完全修正 Flume Hive Sink 一段時間無資料導致的停止工作問題。