1. 程式人生 > >Flume-ng將資料插入hdfs與HBase-0.96.0

Flume-ng將資料插入hdfs與HBase-0.96.0

問題導讀:
1.如何配置分散式flume
2.master與node之間該如何配置,有什麼異同?
3.啟動命令是什麼?
4.flume把資料插入hbase,該如何配置?











一、Flume-ng配置插入hdfs

1)簡介

Flume是一個分散式、可靠、和高可用的海量日誌聚合的系統,支援在系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力。

設計目標:
(1) 可靠性
當節點出現故障時,日誌能夠被傳送到其他節點上而不會丟失。Flume提供了三種級別的可靠性保障,從強到弱依次分別為:end-to-end(收到資料agent首先將event寫到磁碟上,當資料傳送成功後,再刪除;如果資料傳送失敗,可以重新發送。),Store on failure(當資料接收方crash時,將資料寫到本地,待恢復後,繼續傳送),Best effort(資料傳送到接收方後,不會進行確認)。

(2) 可擴充套件性
Flume採用了三層架構,分別為agent,collector和storage,每一層均可以水平擴充套件。其中,所有agent和collector由master統一管理,這使得系統容易監控和維護,且master允許有多個(使用ZooKeeper進行管理和負載均衡),這就避免了單點故障問題。

(3) 可管理性
所有agent和colletor由master統一管理,這使得系統便於維護。多master情況,Flume利用ZooKeeper和gossip,保證動態配置資料的一致性。使用者可以在master上檢視各個資料來源或者資料流執行情況,且可以對各個資料來源配置和動態載入。Flume提供了web 和shell script command兩種形式對資料流進行管理。

(4) 功能可擴充套件性
使用者可以根據需要新增自己的agent,collector或者storage。此外,Flume自帶了很多元件,包括各種agent(file, syslog等),collector和storage(File,HDFS,HBase等)。

2)配置


之前配置過Hadoop和hbase,所以需要先將hadoop和hbase啟動,才能將檔案寫入hdfs和hbase。
(hadoop-2.2.0和hbase-0.96.0安裝可以參考:hadoop2.2完全分散式最新高可靠安裝文件hbase 0.96整合到hadoop2.2三個節點全分散式安裝高可靠文件

本次配置環境為兩臺裝有centos 的測試叢集。主機名為master的機器負責收集日誌,主機名為node的機器負責日誌的寫入,本次配置的寫入方式:寫入普通目錄,寫入hdfs

首先
1.下載flume-ng的二進位制壓縮檔案。
地址:http://flume.apache.org/download.html


2.下載好後,解壓檔案。
(1)首先編輯/etc/profile檔案,在其中新增如下幾行:
  1. export FLUME_HOME=/home/aaron/apache-flume-1.4.0-bin
  2. export FLUME_CONF_DIR=$FLUME_HOME/conf
  3. export PATH=$PATH:$FLUME_HOME/bin
複製程式碼

新增好之後記得執行$ souce /etc/profile命令使修改生效。

(2)在master的flume資料夾的conf目錄中,新建一個flume-master.conf檔案,內容如下:

  1. agent.sources = seqGenSrc
  2. agent.channels = memoryChannel
  3. agent.sinks = remoteSink
  4. # For each one of the sources, the type is defined
  5. agent.sources.seqGenSrc.type = exec
  6. agent.sources.seqGenSrc.command = tail -F /home/aaron/test
  7. # The channel can be defined as follows.
  8. agent.sources.seqGenSrc.channels = memoryChannel
  9. # Each sink's type must be defined
  10. agent.sinks.loggerSink.type = logger
  11. #Specify the channel the sink should use
  12. agent.sinks.loggerSink.channel = memoryChannel
  13. # Each channel's type is defined.
  14. agent.channels.memoryChannel.type = memory
  15. # Other config values specific to each type of channel(sink or source)
  16. # can be defined as well
  17. # In this case, it specifies the capacity of the memory channel
  18. agent.channels.memoryChannel.capacity = 100
  19. agent.channels.memoryChannel.keep-alive = 100
  20. agent.sinks.remoteSink.type = avro
  21. agent.sinks.remoteSink.hostname = node
  22. agent.sinks.remoteSink.port = 23004
  23. agent.sinks.remoteSink.channel = memoryChannel
複製程式碼


(3)在node機器上也將/etc/profile檔案新增上面的配置。然後,在conf中新建一個flume-node.conf檔案,修改如下:
  1. agent.sources = seqGenSrc1
  2. agent.channels = memoryChannel
  3. #agent.sinks = fileSink
  4. agent.sinks = <SPANstyle="FONT-FAMILY: Arial, Helvetica, sans-serif">fileSink</SPAN>
  5. # For each one of the sources, the type is defined
  6. agent.sources.seqGenSrc1.type = avro
  7. agent.sources.seqGenSrc1.bind = node
  8. agent.sources.seqGenSrc1.port = 23004
  9. # The channel can be defined as follows.
  10. agent.sources.seqGenSrc1.channels = memoryChannel
  11. # Each sink's type must be defined
  12. agent.sinks.loggerSink.type = logger
  13. #Specify the channel the sink should use
  14. agent.sinks.loggerSink.channel = memoryChannel
  15. # Each channel's type is defined.
  16. agent.channels.memoryChannel.type = memory
  17. # Other config values specific to each type of channel(sink or source)
  18. # can be defined as well
  19. # In this case, it specifies the capacity of the memory channel
  20. agent.channels.memoryChannel.capacity = 100
  21. agent.channels.memoryChannel.keep-alive = 100
  22. agent.sources.flieSink.type = avro
  23. agent.sources.fileSink.channel = memoryChannel
  24. agent.sources.fileSink.sink.directory = /home/aaron/
  25. agent.sources.fileSink.serializer.appendNewline = true
複製程式碼



在master上面執行命令:

  1. $ bin/flume-ng agent --conf ./conf/ -f conf/flume-maste.conf -Dflume.root.logger=DEBUG,console -n agent
複製程式碼



在node上執行命令:
  1. $ bin/flume-ng agent --conf ./conf/ -f conf/flume-node.conf -Dflume.root.logger=DEBUG,console -n agent
複製程式碼



啟動之後,就可以發現兩者之間可以相互通訊,master上面的檔案就能傳送到node上,修改master上的test檔案,在後面追加內容時,node也可以接收到。

如果想要將內容寫入hadoop,可以將node中的flume-node.conf檔案做如下修改:

  1. agent.sinks = k2
  2. agent.sinks.k2.type = hdfs
  3. agent.sinks.k2.channel = memoryChannel
  4. agent.sinks.k2.hdfs.path = hdfs://master:8089/hbase
  5. agent.sinks.k2.hdfs.fileType = DataStream
  6. agent.sinks.k2.hdfs.writeFormat = Text
複製程式碼




其中,hdfs://master:8089/hbase為hadoop的hdfs檔案路徑。




二、Flume-ng將資料插入HBase-0.96.0



首先,修改node中flume資料夾下conf目錄中的flume-node.conf檔案(原配置參考上文),對其做如下修改:

agent.sinks = k1
agent.sinks.k1.type = hbase
agent.sinks.k1.table = hello
agent.sinks.k1.columnFamily = cf
agent.sinks.k1.column = col1
agent.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer
agent.sinks.k1.channel = memoryChannel


過和上文不同的是,這次要想得到成功結果就沒那麼簡單了,由於依賴的版本問題。此處需要將flume的lib資料夾下的protobuf用Hadoop-2.2.0中的2.5.0版本替換,還需要用hadoop-2.2.0中的guava替換flume的lib資料夾下的guava,刪除原來相應的jar檔案。啟動即可生效。 (注意:在最新的版本不要做這些了,有些錯誤詳情看我的另一篇文章)

flume-ng裡面的SimpleHbaseEventSerializer只提供了最簡單的資料插入hbase功能,如果還有其他需要,就得自己寫HbaseEventSerializer類,在apache-flume-1.4.0-src/flume-ng-sinks/flume-ng-hbase-sink/src/main/java中定義自己的類,實現flume中的HbaseEventSerializer介面。一個簡單的例項如下:

publicclass MyHBaseSerializer implements HbaseEventSerializer {
privatestaticfinal String[] COLUMNS = "column1,column2".split(",");
privatestaticfinal String[] PARAMS = "col1,col2".split(",");
privatebyte[] columnFamily = "cf".getBytes();
privatebyte[] content;
@Override
publicvoid configure(Context context) {
}
@Override
publicvoid configure(ComponentConfiguration conf) {
}
@Override
publicvoid initialize(Event event, byte[] columnFamily) {
this.content = event.getBody();
this.columnFamily = columnFamily;
}
@Override
public List<Row> getActions() {
String string = Bytes.toString(content);
String value1 = string.substring(0,string.length()/2);
String value2 = string.substring(string.length()/2, string.length());
Map<String,String> map = Maps.newHashMap();
map.put(PARAMS[0], value1);
map.put(PARAMS[1], value2);
List<Row> actions = new LinkedList<Row>();
String rowKey = String.valueOf(System.currentTimeMillis());
Put put = new Put(Bytes.toBytes(rowKey));
for (int i = 0; i < COLUMNS.length; i++) {
String value = map.get(PARAMS);
if (value == null)
value = "";
put.add(columnFamily, Bytes.toBytes(COLUMNS), Bytes.toBytes(value));
}
actions.add(put);
return actions;
}
@Override
public List<Increment> getIncrements() {
List<Increment> incs = new LinkedList<Increment>();
return incs;
}
@Override
publicvoid close() {
}
}
該類實現的功能是將檔案中的內容按行切分程兩部分,分別插入列名為column1和column2的兩列中,rowKey為當前時間。完成後將flume-ng程式碼重新編譯打包。然後將flume-ng目錄裡面的lib資料夾的相應的jar檔案替換。然後將上文中的agent.sinks.k1.serializer 值改為test..MyHBaseSerializer即可。其中test為包名。