1. 程式人生 > >flume學習(十一):如何使用Spooling Directory Source

flume學習(十一):如何使用Spooling Directory Source

最近在弄一個信令資料匯聚的事情,主要目的是把FTP上的信令資料匯聚到HDFS上去儲存。 邏輯是這樣的:把FTP伺服器上的檔案下載到一臺主機上,然後SCP到另外一臺主機上的Spooling Directory Source所監控的目錄下面去,sink是hdfs(這裡解釋一下,由於網路環境的因素,另一臺不能訪問到內網的FTP伺服器,所以只能這樣中轉一下)。

嗯,想法不錯,邏輯上看上去也應該沒啥問題,於是就開始吭哧吭哧寫指令碼了。FTP上每個信令資料的每個檔案的大小差不多都有300M左右。SCP到遠端伺服器也沒出現問題,可就是agent老是會掛掉,報這個異常:

2014-11-26 12:30:16,942 ERROR org.apache.flume.source.SpoolDirectorySource: FATAL: Spool Directory source source1: { spoolDir: /var/log/apache/flumeSpool }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.nio.charset.MalformedInputException: Input length = 1
	at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
	at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:195)
	at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:134)
	at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:72)
	at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:91)
	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:241)
	at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:224)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
然後讓我重啟agent才會把Spooling Directory Source所監控的目錄下面的檔案抽取到HDFS上去,感覺很莫名,網上搜索了一下這個錯誤的原因,很多都是說可能傳輸的檔案字符集的原因,不以為然,因為我反覆測試了一下,如果是字符集的原因,那麼為什麼我重啟一下agent又可以成功的抽取資料了。

於是我想了想是不是由於同時讀寫導致的問題,因為我SCP檔案過去,檔案較大,需要一定的時間,而flume監測到有檔案馬上就開始逐行讀取檔案轉化成EVENT傳送到HDFS上去,這中間肯定存在同時讀寫一個檔案了,然後就產生的這個異常問題?

目前僅僅是猜測,於是我修改了Spooling Directory Source的配置,加了這麼一個配置:

tier1.sources.source1.ignorePattern = ^(.)*\\.tmp$

就是忽略監控目錄下面的.tmp檔案。然後我修改了scp的邏輯,拷貝到另一臺主機上時,先命名為:原檔名.tmp(由於是.tmp檔案,agent不會採集此類檔案),等SCP執行成功之後,在mv這個.tmp檔案,去掉.tmp字尾,這樣agent又會抽取這個檔案的資料了,通過這麼一處理,就巧妙的避免了同時讀寫一個檔案的問題。

指令碼調整好之後,重新執行指令碼,驚喜的發現成功了,這次agent沒有掛掉,大功告成了。

總結:使用Spooling Directory Source的時候,一定要避免同時讀寫一個檔案的情況。採用上面提到的方法就可以巧妙的避開這個問題。