1. 程式人生 > >Flume RegexHbaseEventSerializer自定義rowKey

Flume RegexHbaseEventSerializer自定義rowKey

上篇Flume談到setwritewal出錯的問題,通過註釋了3行程式碼。但是由於rowkey預設是自動產生的,產生的規則通過原始碼可以看出,規則是:
String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());

如果要自定義rowkey,修改原始碼是唯一的辦法,RegexHbaseEventSerializer.java就是我們要修改的檔案。我們可以新建一個類來繼承,原始檔案不要去修改。不過今天我測試的時候是直接修改原始檔的。

我的需要的rowkey是:B13612145#1529637655#3#1530085147015#54

String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());

前3個欄位都在檔名中,這就意味著,我必須解析檔名獲得這3個欄位,那麼配置檔案必須新增header:

a1.sources.r1.type = spooldir  
a1.sources.r1.spoolDir = /data/flume/r1/data
a1.sources.r1.batchSize = 100
#a1.sources.r1.fileHeader = true
a1.sources.r1.basenameHeader = true
a1.sources.r1.channels = c1  

OK,來整理一下思路,我需要解析檔名來獲取3個欄位作為rowkey組成部分,那麼配置需要新增header,然後把原始碼自動生成rowkey的規則替換成我們自己的規則,就是這麼簡單。

1. 新建解析檔名的方法:

	public String splitFileName() {
		for (Map.Entry<String, String> entry : headers.entrySet()) {
			return entry.getValue();
		}

		return null;
	}

既然是要解析檔名,很顯然要知道怎麼獲取檔名,從程式碼可以知道headers.entrySet就是獲取header的方法,因為我就一個header,所以一次迴圈就return結果。

2.  替換預設rowkey生成規則

	protected byte[] getRowKey(Calendar cal) {
		/*
		 * NOTE: This key generation strategy has the following properties:
		 * 
		 * 1) Within a single JVM, the same row key will never be duplicated. 2)
		 * Amongst any two JVM's operating at different time periods (according
		 * to their respective clocks), the same row key will never be
		 * duplicated. 3) Amongst any two JVM's operating concurrently
		 * (according to their respective clocks), the odds of duplicating a
		 * row-key are non-zero but infinitesimal. This would require
		 * simultaneous collision in (a) the timestamp (b) the respective nonce
		 * and (c) the random string. The string is necessary since (a) and (b)
		 * could collide if a fleet of Flume agents are restarted in tandem.
		 * 
		 * Row-key uniqueness is important because conflicting row-keys will
		 * cause data loss.
		 */

		this.fileName = splitFileName();
		this.machineNo = fileName.split("_")[1];
		this.fileTimeStamp = fileName.split("_")[2];
		this.fileNo = fileName.split("_")[3].split("\\.")[0];
		String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());

		//String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());
		return rowKey.getBytes(charset);
	}

註釋掉的那行就是預設的規則,新的是我自己要的規則。

就這樣完成了,打個包替換之前的包,消費一個檔案來測試,結果正如我們所期望的:

 B13612145#1529637655#3#1530085147016#55   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                
 B13612145#1529637655#3#1530085147016#55   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                
 B13612145#1529637655#3#1530085147016#55   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,7,93,0                                                            
 B13612145#1529637655#3#1530085147016#55   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 
 B13612145#1529637655#3#1530085147016#55   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     
 B13612145#1529637655#3#1530085147017#56   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                
 B13612145#1529637655#3#1530085147017#56   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                
 B13612145#1529637655#3#1530085147017#56   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,11,92,0                                                           
 B13612145#1529637655#3#1530085147017#56   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 
 B13612145#1529637655#3#1530085147017#56   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     
 B13612145#1529637655#3#1530085147018#57   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                
 B13612145#1529637655#3#1530085147018#57   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                
 B13612145#1529637655#3#1530085147018#57   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,7,93,0                                                            
 B13612145#1529637655#3#1530085147018#57   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 
 B13612145#1529637655#3#1530085147018#57   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     
 B13612145#1529637655#3#1530085147018#58   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                
 B13612145#1529637655#3#1530085147018#58   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                
 B13612145#1529637655#3#1530085147018#58   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=5,8,93,0                                                            
 B13612145#1529637655#3#1530085147018#58   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 
 B13612145#1529637655#3#1530085147018#58   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     
 B13612145#1529637655#3#1530085147019#59   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                
 B13612145#1529637655#3#1530085147019#59   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                
 B13612145#1529637655#3#1530085147019#59   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,8,93,0                                                            
 B13612145#1529637655#3#1530085147019#59   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 
 B13612145#1529637655#3#1530085147019#59   column=cf:ext_toolno, timestamp=1530085147229, value=30         


今天測試的時候碰到2個問題:

1. 消費檔案有幾次出現檔名已經修改為.COMPLETE,但是我HBASE資料沒有任何增加,而且沒有報任何錯誤,。給我的感覺就是沒有消費。測試了幾次,都是如此,很是困惑,後來突然想起來之前有人提到過如果一個很大的檔案需要放到spooldir目錄會發生錯誤,因為檔案一進去就會消費,但是檔案又在拷貝過程。後來我改成先把原始檔名新增.COMPLETE,拷貝完成之後,再修改檔名去掉.COMPLETE.

2. 時間衝突

rowkey的規則裡有時間,我有一個檔案60行資料,消費之後只有48條,因為之前我同過spark 消費也出現過這個問題,因此很容易知道這是因為rowkey衝突了,導致資料覆蓋了,因此把原始檔的nonce.getAndIncrement()加到ROWKEY即可。

簡單說就是迴圈的過程cal.getTimeInMillis()這個玩意會可能重複,很多人覺得微秒級別不應該出現重複,事實上我碰到過2次,因此現在對通過時間作為rowkey格外小心。