1. 程式人生 > >Flume斷點續傳深入研究

Flume斷點續傳深入研究

方法一:在excel source中運用複雜的tail命令
在百度中搜索到一篇文章:https://my.oschina.net/leejun2005/blog/288136

可以在tail傳的時候記錄行號,下次再傳的時候,取上次記錄的位置開始傳輸,類似:

agent1.sources.avro-source1.command = /usr/local/bin/tail  -n +$(tail -n1 /home/storm/tmp/n) --max-unchanged-stats=600 -F  /home/storm/tmp/id.txt | awk 'ARNGIND==1{i=$0;next}{i++; if($0~/檔案已截斷/)i=0; print i >> "/home/storm/tmp/n";print $1"---"i}' /home/storm/tmp/n -
需要注意如下幾點:
(1)檔案被rotation的時候,需要同步更新你的斷點記錄“指標”,
(2)需要按檔名來追蹤檔案,
(3)flume掛掉後需要累加斷點續傳“指標”
(4)flume掛掉後,如果恰好檔案被rotation,那麼會有丟資料的風險,只能監控儘快拉起或者加邏輯判斷檔案大小重置指標。
(5)tail 注意你的版本,請更新coreutils包到最新。

於是乎:
[[email protected] ~]$ cat data.txt 

Jan 18 22:25:55 192.168.101.254 [email protected] syslog-ng[69]: STATS: dropped 0
Jan 31 10:24:27 192.168.101.254 [email protected] kernel: External interface ADSL is down
Jan 31 10:24:31 192.168.101.254 [email protected] system: ||||IPSec event unroute-client on tunnel to gz
Feb  2 06:26:01 h107 rsyslogd0: action 'action 17' resumed (module 'builtin:ompipe') [try http://www.rsyslog.com/e/0 ]
Feb 20 06:25:04 h107 rsyslogd: [origin software="rsyslogd" swVersion="8.4.2" x-pid="22204" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
Feb 21 10:24:32 192.168.101.254 
[email protected]
kernel: Feb 21 10:24:33 192.168.101.254 [email protected] kernel:
[[email protected] ~]$ cat n
0

[[email protected] ~]$ echo "Feb 21 10:24:33 192.168.101.254 [email protected] kernel: " >> data.txt
[[email protected] ~]$ cat n
1
2
3
4
5
6
7

[[email protected]

conf]$ cat hehe.sh
/home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/bin/flume-ng agent -c /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf -f /home/hadoop/apache-flume-1.6.0-cdh5.5.2-bin/conf/hbase_simple.conf -n a1 -Dflume.root.logger=INFO,console
tail -n +0 -F /home/hadoop/data.txt |awk 'ARNGIND==1{next}{i++;print i > "/home/hadoop/n"}' /home/hadoop/data.txt
(一開始我還擔心第二條命令也會和第一條命令一併執行,那這樣的話n檔案就不能記錄flume程序掛掉後的行數了啊,最後還好經過試驗知道第二條命令在第一條命令執行完成後才執行)

[[email protected] conf]$ cat hbase_simple.conf 

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -n +$(tail -n -1 /home/hadoop/n) -F /home/hadoop/data.txt
a1.sources.r1.port = 44444
a1.sources.r1.host = 192.168.8.71
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k1.type = hbase
a1.sinks.k1.table = messages
a1.sinks.k1.columnFamily = host
a1.sinks.k1.columnFamily = cf
a1.sinks.k1.serializer = com.tcloud.flume.AsyncHbaseLogEventSerializer
a1.sinks.k1.channel = memoryChannel

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channe
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
[[email protected] conf]$ sh hehe.sh
但是:
tail -n +$(tail -n -1 /home/hadoop/n) -F /home/hadoop/data.txt這條指令在Linux中好使,在flume中卻不好使,難道是$的原因嗎。。。
在Linux中:
[[email protected] conf]$ tail -n +$(tail -n -1 /home/hadoop/n) -F /home/hadoop/data.txt
Jan 18 22:25:55 192.168.101.254 [email protected] syslog-ng[69]: STATS: dropped 0
Jan 31 10:24:27 192.168.101.254 [email protected] kernel: External interface ADSL is down
Jan 31 10:24:31 192.168.101.254 [email protected] system: ||||IPSec event unroute-client on tunnel to gz
Feb  2 06:26:01 h107 rsyslogd0: action 'action 17' resumed (module 'builtin:ompipe') [try http://www.rsyslog.com/e/0 ]
Feb 20 06:25:04 h107 rsyslogd: [origin software="rsyslogd" swVersion="8.4.2" x-pid="22204" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
Feb 21 10:24:32 192.168.101.254 [email protected] kernel: 
在flume中sink能開啟,但是source端卻抽不到資料。。

無奈之下我只能換了思路,那就是每次flume程序因故障掛掉的時候,去檢視n檔案的最後一行數字就是flume程序結束時停止的行數,然後再手動去修改hbase_simple.conf中的
a1.sources.r1.command = tail -n +0 -F /home/hadoop/data.txt   (-n後面的引數就是檢視n檔案最後一行的數字再加一)
比如n檔案的最後一行是7,那麼再啟動flume的時候就得將hbase_simple.conf中的a1.sources.r1.command改為tail -n +8 -F /home/hadoop/data.txt
哎,但是我感覺這種方法好笨啊,雖然是無奈之舉。。

額,後來又想到這樣還不能在a1.sources.r1.command中寫入兩個檔案了,如果寫入兩個檔案的話會出現這種情況(無法分開統計,成了累加了):
[[email protected] ~]$ tail -n +0 -F /home/hadoop/data.txt /home/hadoop/data2.txt |awk 'ARNGIND==1{next}{i++;print i > "/home/hadoop/n"}' /home/hadoop/data.txt /home/hadoop/data2.txt
[[email protected] ~]$ cat n
1
2
3
4
5
6
7
8
9
10
11
12
[[email protected] ~]$ tail -n +0 -F /home/hadoop/data.txt /home/hadoop/data2.txt

==> /home/hadoop/data.txt <==
Jan 18 22:25:55 192.168.101.254 [email protected] syslog-ng[69]: STATS: dropped 0
Jan 31 10:24:27 192.168.101.254 [email protected] kernel: External interface ADSL is down
Jan 31 10:24:31 192.168.101.254 [email protected] system: ||||IPSec event unroute-client on tunnel to gz
Feb  2 06:26:01 h107 rsyslogd0: action 'action 17' resumed (module 'builtin:ompipe') [try http://www.rsyslog.com/e/0 ]
Feb 20 06:25:04 h107 rsyslogd: [origin software="rsyslogd" swVersion="8.4.2" x-pid="22204" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
Feb 21 10:24:32 192.168.101.254 [email protected] kernel: 
Feb 21 10:24:33 192.168.101.254 [email protected] kernel: 
Feb 21 10:24:33 192.168.101.254 [email protected] kernel: 

==> data2.txt <==
Feb 01 05:54:55 192.168.101.254 [email protected] trafficlogger: empty map for 1:4097 in classnames
Jan 23 20:07:00 192.168.101.254 [email protected] trafficlogger: empty map for 1:4097 in classnames
Jan 29 06:29:39 h107 rsyslogd-2007: action 'action 17' suspended, next retry is Sun Jan 29 06:30:09 2017 [try http://www.rsyslog.com/e/2007 ]
Jan 29 20:07:01 192.168.101.254 [email protected] trafficlogger: empty map for 1:4097 in classnames
參考地址:http://www.cnblogs.com/zhzhang/p/5778836.html

後來發現在Linux下這個命令真的挺好用的,如果flume好使的話真的是不錯,奈何flume不好使
a1.sources.r1.command = tail -n +$(tail -n1 /home/hadoop/n) -F /home/hadoop/data.txt 2>&1 | awk 'ARGIND==1{i=$0;next}{i++;if($0~/^tail/){i=0};print $0;print i >> "/home/hadoop/n";fflush("")}' /home/hadoop/n -
並且需要在首次啟動flume程序時把n檔案內容設定為1,如果設定為0的話第二次啟動的時候讀取的行數不是結束時的行數。

後來在群裡問有人說flume不一定可以理解這麼複雜的shell的,人家說exec是Java程式而已,又不是shell終端,普通的tail -f也只是給flume資料流而已,需要看看exec的原始碼,估計也只是從shell命令拿到資料流而已

方法二:自己編寫source端
參考:github地址:https://github.com/cwtree/flume-filemonitor-source

先在hbase中建立相應的表:
hbase(main):175:0> create 'messages','cf','host'

flume配置:
[[email protected] conf]$ cat chiwei.conf 

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = org.apache.flume.chiwei.filemonitor.FileMonitorSource
a1.sources.r1.channels = c1
a1.sources.r1.file = /home/hadoop/messages
a1.sources.r1.positionDir = /home/hadoop

a1.sinks.k1.type = logger
a1.sinks.k1.type = hbase
a1.sinks.k1.table = messages
a1.sinks.k1.columnFamily = host
a1.sinks.k1.columnFamily = cf
a1.sinks.k1.serializer = com.tcloud.flume.AsyncHbaseLogEventSerializer
a1.sinks.k1.channel = memoryChannel

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
它會監測那個檔案的變化的,並且週期性的將當前寫過的位置記錄到position.log檔案中,按位元組處理的;如果程序掛了,下次重新啟動,它會自動按照position.log最後記錄的位元組位置開始往後寫到下游的

這個有個弊端:
就是再次啟動flume程序的時候當意外停止程序到啟動程序之間有一行日誌輸入我在sink端可以匹配空白行解決,但是如果期間有多行匯入的時候就無能為力了,這期間的資料就會丟失了,啟動之後日誌如果是一行一行的輸入也沒事,就怕一次匯入多行我這個就不靈了。我感覺得修改source端的原始碼讓資料一行一行的讀取,而不是一起讀取。

問題再現:

Jan 31 10:24:36 192.168.101.254 [email protected] kernel: External interface ADSL is downn
Jan 31 10:24:37 192.168.101.254 [email protected] kernel: External interface ADSL is downn
Jan 31 10:24:38 192.168.101.254 [email protected] kernel: External interface ADSL is downn
Jan 31 10:24:39 192.168.101.254 [email protected] kernel: External interface ADSL is downn(一次輸入多行匯入hbase失敗)

null
Jan 31 10:24:40 192.168.101.254 [email protected] kernel: External interface ADSL is downn(一次輸入一行匯入hbase成功)

[email protected]
20170131102440
Jan 31 10:24:41 192.168.101.254 [email protected] kernel: External interface ADSL is downn

[email protected]
20170131102441
解決:
我在AsyncHbaseLogEventSerializer中添加了陣列遍歷,而不是在正則中匹配空行,這才解決了問題。。。
12/12/13 18:13:04 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
Jan 18 22:25:55 192.168.101.254 [email protected] syslog-ng[69]: STATS: dropped 0
Jan 31 10:24:27 192.168.101.254 [email protected] kernel: External interface ADSL is down
Jan 31 10:24:31 192.168.101.254 [email protected] system: ||||IPSec event unroute-client on tunnel to gz
Feb  2 06:26:01 h107 rsyslogd0: action 'action 17' resumed (module 'builtin:ompipe') [try http://www.rsyslog.com/e/0 ]
Feb 20 06:25:04 h107 rsyslogd: [origin software="rsyslogd" swVersion="8.4.2" x-pid="22204" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
Jan 31 10:24:27 192.168.101.254 [email protected] kernel: External interface ADSL is down
Jan 31 10:24:28 192.168.101.254 [email protected] kernel: External interface ADSL is down

Jan 18 22:25:55 192.168.101.254 [email protected] syslog-ng[69]: STATS: dropped 0
[email protected]
20170118222555
Jan 31 10:24:27 192.168.101.254 [email protected] kernel: External interface ADSL is down
[email protected]
20170131102427
Jan 31 10:24:31 192.168.101.254 [email protected] system: ||||IPSec event unroute-client on tunnel to gz
[email protected]
20170131102431
Feb  2 06:26:01 h107 rsyslogd0: action 'action 17' resumed (module 'builtin:ompipe') [try http://www.rsyslog.com/e/0 ]
[email protected]
20170202062601
Feb 20 06:25:04 h107 rsyslogd: [origin software="rsyslogd" swVersion="8.4.2" x-pid="22204" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
[email protected]
20170220062504
Jan 31 10:24:27 192.168.101.254 [email protected] kernel: External interface ADSL is down
[email protected]
20170131102427
Jan 31 10:24:28 192.168.101.254 [email protected] kernel: External interface ADSL is down
[email protected]
20170131102428
Jan 31 10:24:29 192.168.101.254 [email protected] kernel: External interface ADSL is down

Jan 31 10:24:29 192.168.101.254 [email protected] kernel: External interface ADSL is down
[email protected]
20170131102429
程式碼已上傳http://download.csdn.net/download/m0_37739193/10154916

總結:

上面兩個方法是在flume的舊版本的時候沒有斷點續傳功能的時候大家的一種大膽的嘗試,可是畢竟大家水平有限而且都是個人在搞,所以都存在一定的缺陷。直到flume1.7推出了TaildirSource元件後(大家可以參考我的另一篇文章http://blog.csdn.net/m0_37739193/article/details/72962192),終於實現了斷點續傳功能,而且人家肯定是比較好的(有團隊組織在維護,裡面有大牛在哈,人家肯定是經過了反覆的測試研究後才釋出的)。上面兩個方法就是為了給大家開啟思路,提供參考而已,感覺思想還是很不錯的。最後就是,既然已經有了現成好用的東西了,你也沒必有再自己造輪子了,而且你造的輪子未必有人家的好。。。