日誌收集框架 Flume 組件之Source使用
Source 中文譯為來源,源
作用:采集數據,然後把數據傳輸到channel上。
例如:監控某個文件或者某個端口或某個目錄,新增數據,新增文件的變化,然後傳輸到channel。
常用的的source類型,也是平常用的比較多的幾種類型,如下:
source類型 | 說明 |
---|---|
Avro Source | 支持avro協議,內置支持 |
Thrift Source | 支持Thirft rpc ,內置支持 |
Exec Source | 基於Unix的command在標準輸出上采集數據 ,如tail -F |
JMS Source | 監控JMS系統,比如Activemq,可以 |
Taildir Source | 監聽目錄或文件(Flume1.8版本支持) |
Spooling Directory Source | 監聽目錄下的新增文件 |
Kafka Source | 讀取Kafka數據 |
下面不多少,簡單實戰,沒安裝的可以google一下,好多安裝教程,本文是基於Flume 1.8
Exec Source,前面說過了,exec source 是以tail -F 形式來監聽文件的變化的,
flume-exec.conf配置:
# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # # Describe/configure the source # 配置類型為exec a1.sources.r1.type = exec # 路徑是自己要監聽的日誌路徑 a1.sources.r1.command = tail -F /usr/local/installed/tomcat/logs/system_app.log a1.sources.r1.channels = c1 # # Describe the sink # 下沈sink是以日誌的形式來打印 a1.sinks.k1.type = logger # # Use a channel which buffers events in memory # channel采用以內存形式來存放上遊source傳遞過來的數據 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
具體使用步驟:
1、啟動
進入到flume安裝目錄,../bin下,命令如下:
./bin/flume-ng agent -n a1 -c ../conf/ -f ../conf/flume-exec.conf
缺點:agent掛了,則不會記錄上次傳遞數據的位置,還是以tail -F為準,來重新傳遞數據。
Taildir Source 監聽目錄文件變化,記錄上一次同步後的位置,實現斷點續傳,可以保證沒有重復數據的讀取。
# The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # # Describe/configure the source a1.sources.r1.type = TAILDIR # 保存監聽文件的讀取位置的文件 a1.sources.r1.positionFile = /opt/flume/taildir_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /usr/local/installed/tomcat/logs/system_app.log a1.sources.r1.batchSize = 100 a1.sources.r1.backoffSleepIncrement = 1000 a1.sources.r1.maxBackoffSleep = 5000 # # # Describe the sink a1.sinks.k1.type = logger # # # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # # # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
具體測試,可以往監聽的文件裏寫入數據,看看是否可以監聽到數據。
Spooling Directory Source 監聽目錄文件的變化,
flume-spooling.conf 配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# # Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/self
a1.sources.r1.deletePolicy = immediate
a1.sources.r1.fileSuffix = completed
a1.sources.r1.batchSize = 100
# # Describe the sink
a1.sinks.k1.type = logger
#
# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
# # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
備註:註意,只監聽新增的文件,這個目錄下有新增文件會被監聽到。目錄下子文件夾也不會被監聽到,目錄下以有的文件更新了,也不會被監聽到。
Avro Source配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# # Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = 127.0.0.1
a1.sources.r1.port = 44444
#
# # Describe the sink
a1.sinks.k1.type = logger
#
# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#
# # Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
備註:此文件是監聽127.0.0.1:44444端口的數據變化,可以telnet 127.0.0.1:44444,輸入數據,看flume是否監聽到數據。
其它的一些類型,可自行測試。
./bin/flume-ng agent -n a1 -c ../conf/ -f ../conf/flume-exec.conf
由於本文是在bin目錄下啟動的,沒有更改flume產生日誌的位置,所以會在/bin/logs/ 會有flume日誌產生。
測試的時候,自己開一個窗口,監控日誌的變化,由於本文是采用以log日誌的形式輸出,所以用這個命令tail -f ./bin/logs/flume.log 可以看到是否配置成功。
連接:
flume 概念以及模型簡介地址:
日誌收集框架 Flume 簡介
http://blog.51cto.com/shangdc/2178127
日誌收集框架 Flume 組件之Source使用