1. 程式人生 > >flume向hdfs中寫入會生成很多小檔案

flume向hdfs中寫入會生成很多小檔案

問題:

flume監控的目錄寫入大檔案的時候不能同步記錄在hdfs中
flume監控的目錄寫入大檔案的時候,同步記錄到hdfs中後變成多個小檔案

flume從kafka中讀取資料下沉到hdfs中會生成很多小檔案
解決辦法:更改flume的配置資訊(主要更改滾動方式),滾動的意思是當flume監控的目錄達到了配置資訊中的某一條滾動方式的時候,會觸發flume提交一個檔案到hdfs中(即在hdfs中生成一個檔案)
flume有三種滾動方式。
1.按照時間
2.按照大小
3.按照count.
如果時間不合適。可以按照大小來滾動,比如70M
rollsize=70M ,當然這裡記得換算單位70*2024

下面的示例是設定rollInterval=60秒的時候滾動一次,60秒同步一次資料到hdfs中,也就是說在60秒之內kafka裡面插入的新資料都會被放在當前這個hdfs檔案,60秒過後將滾動生成一個新的hdfs檔案。

##kafka-source

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = 192.168.12.103:6667,192.168.12.102:6667,192.168.12.104:6667
a1.sources.r1.kafka.consumer.group.id = lyl-gid1
a1.sources.r1.kafka.topics = flume_test

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /test/flume/test/
a1.sinks.k1.hdfs.useLocalTimeStamp = false
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.filePrefix = test
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
# hdfs.rollInterval屬性設定開啟時間,使時間足夠長,獲取足夠多的內容
a1.sinks.k1.hdfs.rollInterval = 60
# hdfs.rollSize屬性設定檔案大小,當檔案達到一定的大小的時候才傳輸(預設1024個位元組)
a1.sinks.k1.hdfs.rollSize = 0
#hdfs.rollCount屬性設定接受的事件數目,當檔案寫滿了給定數量的事件之後才傳輸。
a1.sinks.k1.hdfs.rollCount = 0


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1500000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

接下來,再來看看通過設定rollSize大小來控制滾動,如我這裡設定為5kb就滾動

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /test/flume/test/
a1.sinks.k1.hdfs.useLocalTimeStamp = false
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.filePrefix = test
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
# hdfs.rollInterval屬性設定開啟時間,使時間足夠長,獲取足夠多的內容
a1.sinks.k1.hdfs.rollInterval = 0
# hdfs.rollSize屬性設定檔案大小,當檔案達到一定的大小的時候才傳輸(預設1024個位元組)
a1.sinks.k1.hdfs.rollSize = 5120
#hdfs.rollCount屬性設定接受的事件數目,當檔案寫滿了給定數量的事件之後才傳輸。
a1.sinks.k1.hdfs.rollCount = 0

注意sink.type,如果是memory模式,注意檔案的大小,防止記憶體不足,太大可以設定sink.type = file