1. 程式人生 > >Flume-將資料寫入動態分割槽表

Flume-將資料寫入動態分割槽表

一、 場景描述

實時監控檔案目錄,將目錄中的實時產生的資料檔案檔案內容非動態)寫入動態分割槽,分割槽為3裝置ID/檔案產生日期/檔案產生的時間(h)).檔名格式如下(日期+時間+產品ID.txt

二、 主要存在的難點

由於flume只支援傳入一些簡單的引數變數(時間/日期/檔名等)所以這裡我們如果想動態的識別我們的檔名並直接生成sink路徑及相應檔名有困難

三、 解決方法

這裡我們依然選用hdfs sink來作資料消費但要稍微改變一部分原始碼,來達到我們通過識別檔名來確定輸出路徑的目的,步驟如下:

(1) 下載apache-flume-1.7.0-src原始碼 (官網直接下載)

(2) 開啟cmd編譯原始碼(這裡用的

mvn版本號3.3.9,最好用3.3版本以上的版本,我自己用3.2出啦點問題)

編譯小提示:

maven確實是一個好東西,但是在國內下載官方倉庫的jar卻是個大問題,速度不敢恭維,現在oschina的國內maven映象服務已關閉,無奈之下只能另尋門路。

今天突然發現了阿里雲maven國內映象,修改完以後速度飛一般的感覺。

修改方法:在~/.m2目錄下的settings.xml檔案中,(如果該檔案不存在,則需要從maven/conf目錄下拷貝一份),找到<mirrors>標籤,新增如下子標籤:

<mirror>

      <id>alimaven</id>

      <name>aliyun maven</name>

      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>

      <mirrorOf>central</mirrorOf>        

</mirror>

編譯成功後如下:


(3) 把專案匯入eclipse做修改

下圖為修改後程式碼並且在環境上執行通過並寫入後的程式碼,主要修改flume-hdfs-sink下的HDFSEentSink.java檔案,主要修改process

方法我們通過識別檔名來拼出hdfs的檔案路徑達到動態識別檔名並建立動態hdfs路徑的目的:

 

修改之後直接重新將子專案flume-hdfs-sink打包flume-hdfs-sink-1.7.0.jar,最後直接去環境下替換掉原來的jar包(路徑如下,記得將原jar做備份):

 

到這裡我們對flume一些小改動基本結束。

四、 測試

1建立flume配置檔案簡單的flume配置檔案如下:

#agent1表示代理名稱

agent1.sources=source1

agent1.sinks=sink1

agent1.channels=channel1

#Spooling Directory是監控指定資料夾中新檔案的變化,一旦新檔案出現,就解析該檔案內容,然後寫入到channle。寫入完成後,標記該檔案已完成或者刪除該檔案。

#配置source1

agent1.sources.source1.type=spooldir

agent1.sources.source1.spoolDir=/tracy/flume_test

agent1.sources.source1.basenameHeader=true

agent1.sources.source1.basenameHeaderKey=fileName

agent1.sources.source1.deletePolicy=immediate

#batchSize是針對SourceSink提出的一個概念,它用來限制sourcesinkevent批量處理的

agent1.sources.source1.batchSize=1000

#channelevent的最大數量

a1.channels.c1.capacity = 1000

#每次從sourcechannel或從channelsinkevent最大的吞吐量

a1.channels.c1.transactionCapacity = 100

#channle滿載的情況下30s後丟擲異常

agent1.channels.ch1.keep-alive = 30

agent1.sources.source1.request-timeout=2000

agent1.sources.source1.connect-timeout=3000

agent1.sources.source1.channels=channel1

#加攔截器

#agent1.sources.source1.interceptors = i1

#時間戳攔截器

#agent1.sources.source1.interceptors.i1.type = timestamp

#配置channel1

agent1.channels.channel1.type=file

agent1.channels.channel1.checkpointDir=/tracy/flume_test1

agent1.channels.channel1.dataDirs=/tracy/flume_test2

#配置sink1

agent1.sinks.sink1.type=hdfs

agent1.sinks.sink1.hdfs.path=hdfs://hadoop.nameNode1:9000/user/hive/warehouse/test.db/info_flume_data_dt1/

#DataStream類似於textfile

agent1.sinks.sink1.hdfs.fileType=DataStream

#只寫入eventbody部分

agent1.sinks.sink1.hdfs.writeFormat=TEXT

agent1.sinks.sink1.hdfs.batchSize = 1

agent1.sinks.sink1.hdfs.rollInterval = 1

agent1.sinks.sink1.hdfs.rollcount = 1

agent1.sinks.sink1.hdfs.rollsize = 0

agent1.sinks.sink1.hdfs.inUsePrefix=t

agent1.sinks.sink1.hdfs.inUseSuffix=.temp

agent1.sinks.sink1.channel=channel1

agent1.sinks.sink1.hdfs.filePrefix=%{fileName}

2事先Hive最終要落地分割槽表如下:

DROP TABLE IF EXISTS INFO_FLUME_DATA_DT1;

CREATE TABLE IF NOT EXISTS INFO_FLUME_DATA_DT1(

t_date string comment '時間',

detail string comment '引數'

) PARTITIONED BY (product string,l_date date,houra string)

--clustered by (t_date) sorted by(detail) into 4 buckets

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '\t'

LINES TERMINATED BY '\n'

STORED AS TEXTFILE;

(4) 啟動我們的單節點flume進行測試:

./flume-ng agent -n agent1 -c ../conf -f ../conf/flume-conf-hdfs.properties -Dflume.root.logger=DEBUG,console

(5) 往flume監控的目錄放入檔案:

Flume控制臺輸出如下

 

這裡可以看到我們程式碼中打樁的位置,路徑檔名沒什麼問題再去hdfs路徑下看下檔案是否寫入:

 

沒問題

(6) 檢視hive資料情況:

這裡我們要注意,要想我們在hdfs的資料在hive可見,我們首先要alter一下分割槽:

ALTER TABLE INFO_FLUME_DATA_DT1

ADD PARTITION (product='02TE04-3',l_date='2017-03-13',houra='22');

最後hive表看資料:

 

沒問題

實際情境下我們不可能每天手動去新增分割槽,每天跑個指令碼把分割槽新增好就ok,如下(舉例)

# !/bin/sh

for((i=0;i<24;i++))

do

    beeline -u jdbc:hive2:// --verbose=true -e "ALTER TABLE test.INFO_FLUME_DATA_DT1 ADD PARTITION (product='02TE04-4',l_date='`date +"%Y-%m-%d"`',houra='$i')"

    echo $i

done