1. 程式人生 > >大型協作框架flume中avro型別的應用場景

大型協作框架flume中avro型別的應用場景

source的型別:

 1、avro source

   偵聽Avro埠並從外部Avro客戶端流接收事件。 當與另一個(上一跳)Flume代理上的內建Avro Sink配對時,它可以建立分層集合拓撲。 

 2、thrif source

        監聽Thrift埠並從外部Thrift客戶端流接收事件。 當與另一(前一跳)Flume代理上的內建ThriftSink配對時,它可以建立分層集合拓撲。 Thrift源可以配置為通過啟用kerberos身份驗證在安全模式下啟動。 agent-principal和agent-keytab是Thrift源用來向kerberos KDC進行身份驗證的屬性。

 3、exec  source

       Exec源在啟動時執行給定的Unix命令,並期望該程序在標準輸出上連續產生資料(除非屬性logStdErr設定為true,否則stderr將被丟棄)。 如果程序由於任何原因退出,源也會退出,並且不會產生進一步的資料。 這意味著諸如cat [named pipe]或tail -F [file]之類的配置將產生期望的結果,其中日期可能不會 - 前兩個命令產生資料流,其中後者產生單個事件並退出。

  4、spooling directory source

       此源允許您通過將要提取的檔案放入磁碟上的“spooling”目錄中來提取資料。此源將監視新檔案的指定目錄,並在新檔案顯示時解析新檔案中的事件。事件解析邏輯是可插入的。在給定檔案被完全讀入通道之後,它被重新命名以指示完成(或可選地被刪除)。
與Exec源不同,該源是可靠的,並且不會錯過資料,即使Flume被重新啟動或被殺死。為了換取這種可靠性,只有不可變,唯一命名的檔案必須放入spooling目錄中。 Flume嘗試檢測這些問題條件,如果違反則會大聲失敗:
如果在放入spooling目錄後寫入檔案,Flume將在其日誌檔案中列印一個錯誤並停止處理。
如果以後重新使用檔名,Flume會在其日誌檔案中列印一個錯誤並停止處理。

  5、kafka source

       Kafka Source是一個從Kafka主題讀取訊息的Apache Kafka消費者。 如果您有多個Kafka源執行,您可以使用相同的Consumer Group配置它們,這樣每個都將為主題讀取一組唯一的分割槽。

下面是筆者在實際應用中用到的一種avro型別resource:

需要配置兩個配置檔案:

1.

2.

----------------------------------- ------------------------ ## define agent source、 channel、 sinks、name a1.sources = s1
a1.channels = c1
a1.sinks = k1
## define sources
a1.sources.s1.type = exec
a1.sources.s1.command = tail -F /usr/local/nginx/datalog/access.log
a1.sources.s1.shell = /bin/bash -c

## define channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

## define sinks
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 192.168.134.101
a1.sinks.k1.port = 4545

## 關聯 sources和 sinks
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
-------------------------------- ------------------------------------ ## define agent source、 channel、 sinks、name
a2.sources = s2
a2.channels = c2
a2.sinks = k2

## define sources
a2.sources.s2.type = avro
a2.sources.s2.bind = 192.168.134.101
a2.sources.s2.port = 4545

## define channels
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

## define sinks
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://192.168.134.101:8020/flume/%Y%m%d
a2.sinks.k2.hdfs.filePrefix = access_log

## 設定目錄按照年/月/日進行回滾
a2.sinks.k2.hdfs.round = true
a2.sinks.k2.hdfs.roundValue = 1
a2.sinks.k2.hdfs.roundUnit = day

## 解決生產很多小檔案,設定檔案回滾的條件
a2.sinks.k2.hdfs.rollInterval = 0
a2.sinks.k2.hdfs.rollSize = 134217728
a2.sinks.k2.hdfs.rollCount = 0

## 這個引數必須設定,不然上面的回滾設定不生效
a2.sinks.k2.hdfs.minBlockReplicas = 1

a2.sinks.k2.hdfs.batchSize = 1000
a2.sinks.k2.hdfs.fileType = DataStream
a2.sinks.k2.hdfs.writeFormat = Text
a2.sinks.k2.hdfs.useLocalTimeStamp = true 

## 關聯 sources和 sinks
a2.sources.s2.channels = c2
a2.sinks.k2.channel = c2
--------------------------------------------------------------------------- 兩個文件分別是flume執行時所需的配置檔案,文件一收集的日誌放到4545埠(該埠為自定義埠),做為文件2的source,文件2收集的資料在存放在hdfs上面。 應用場景: 一共五臺伺服器,每臺機子上面都有flumenginx;這五臺伺服器的nginx伺服器產生的日誌都回桶過文件一的配置收集在一起,第二個配置在這五臺中的任意一臺啟動application,負責收集4545埠的資料(既五臺agent收集到一起的資料),作為source,最後存放在hdfs上面。