基於flume的日誌收集系統配置
大資料系統中通常需要採集的日誌有:
- 系統訪問日誌
- 使用者點選日誌
- 其他業務日誌(比如推薦系統的點選日誌)
在收集日誌的時候,一般分為三層結構:採集層、彙總層和儲存層,而不是直接從採集端將資料傳送到儲存端,這樣的好處有:
- 如果儲存端如Hadoop叢集、Kafka等需要停機維護或升級,對部署在應用伺服器上的採集端沒有影響,只需要彙總層做好資料的緩衝,在儲存端恢復正常後繼續寫入資料。
- 採集層只負責資料的採集,由彙總層統一維護資料的路由邏輯(比如傳送到hdfs還是kafka?),由於採集端所在的應用伺服器一般數量較多,且會隨著業務的擴充套件而不斷增加,這種方式可以降低日誌採集配置的維護成本,降低大資料應用對業務系統的影響
基於三層結構的flume日誌採集系統架構一般如下圖所示:
說明:
對於採集層agent,一般要求儘快將日誌傳送出去,避免在採集層堆積資料,所以使用memory的channel,sink統一使用avro;對於彙總層agent,要求可以儘量保證資料的緩衝,所以使用file channel,並且儘量調大容量,對於要求實時處理的資料,可以使用SSD的磁碟以提高處理速度,source統一使用avro。
各agent的配置如下:
【Agent-1】:位於採集層,用於收集應用A產生的日誌,這些日誌需要儲存到HDFS中用於離線分析,同時也需要傳送給Kafka用於實時的計算(如使用者點選日誌)。
a1.sources = src_1
a1.channels = ch_m_1
a1.sinks = sink_1 sink_2
# 配置 source:從指定目錄讀取日誌資料
a1.sources.src_1.type = spooldir
a1.sources.src_1.channels = ch_m_1
a1.sources.src_1.spoolDir = /data/nginx/log/user_click/
a1.sources.src_1.includePattern=^.*$
# 日誌資料一般是按照size或者時間切換,應用正在寫入的檔案不能讀取,否則flume會報錯,所以需要把這個檔案排除掉。比如正在寫入的是user.click.log,切換後的是user.click.log.yyyy.mm.dd.hh,則需要把user.click.log檔案排除掉:
a1.sources.src_1.ignorePattern=^.*log$
# 配置 channel
a1.channels.ch_m_1.type = memory
# channel中可以快取的event數量的最大值。可以根據單個event所佔空間和可用的記憶體來評估可以快取的event的最大數量
a1.channels.ch_m_1.capacity = 100000
# 一個事務中可批量接收或傳送的event數量的最大值
a1.channels.ch_m_1.transactionCapacity = 5000
# 配置 sinks:多個sink節點,用於負載均衡
a1.sinks.sink_1.channel = ch_m_1
a1.sinks.sink_1.type = avro
# 彙總層節點地址
a1.sinks.sink_1.hostname = 192.168.1.110
# 彙總層節點avro監聽埠
a1.sinks.sink_1.port = 44446
a1.sinks.sink_2.channel = ch_m_1
a1.sinks.sink_2.type = avro
# 彙總層節點地址
a1.sinks.sink_2.hostname = 192.168.1.111
# 彙總層節點avro監聽埠
a1.sinks.sink_2.port = 44446
# sink端的負載均衡配置
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = sink_1 sink_2
a1.sinkgroups.g1.processor.type = load_balance
# 使用輪詢的方式選擇sink
a1.sinkgroups.g1.processor.selector = round_robin
【Agent-2】:位於採集層,用於收集應用B產生的日誌,這些日誌只需要傳送給Kafka用於實時的計算。
a2.sources = src_1
a2.channels = ch_m_1
a2.sinks = sink_1 sink_2
# 配置 source:從指定目錄讀取日誌資料
a2.sources.src_1.type = spooldir
a2.sources.src_1.channels = ch_m_1
a2.sources.src_1.type = exec
# 使用tail -f 命令監聽日誌的修改,可以近實時的採集到需要的日誌資料
a2.sources.src_1.command = tail -F /data/nginx/log/user_action/action.log
# 配置 channel
a2.channels.ch_m_1.type = memory
# channel中可以快取的event數量的最大值。可以根據單個event所佔空間和可用的記憶體來評估可以快取的event的最大數量
a2.channels.ch_m_1.capacity = 100000
# 一個事務中可批量接收或傳送的event數量的最大值
a2.channels.ch_m_1.transactionCapacity = 5000
# 配置 sinks:多個sink節點,用於負載均衡
a2.sinks.sink_1.channel = ch_m_1
a2.sinks.sink_1.type = avro
# 彙總層節點地址
a2.sinks.sink_1.hostname = 192.168.1.110
# 彙總層節點avro監聽埠,由於處理邏輯不一樣,所以此處彙總層的avro埠與【Agent-1】不同
a2.sinks.sink_1.port = 44447
a2.sinks.sink_2.channel = ch_m_1
a2.sinks.sink_2.type = avro
# 彙總層節點地址
a2.sinks.sink_2.hostname = 192.168.1.111
# 彙總層節點avro監聽埠
a2.sinks.sink_2.port = 44447
# sink端的負載均衡配置
a2.sinkgroups = g1
a2.sinkgroups.g1.sinks = sink_1 sink_2
a2.sinkgroups.g1.processor.type = load_balance
# 使用輪詢的方式選擇sink
a2.sinkgroups.g1.processor.selector = round_robin
【Agent-3】:位於彙總層,用於處理【Agent-1】傳送過來的資料,由於需要同時將資料傳送給HDFS和Kafka,所以配置了兩個channel。
a3.sources = src_1
a3.channels = ch_f_1 ch_f_2
a3.sinks = sink_1 sink_2
# 配置 source: 用於處理【agent-1】發過來的資料,將資料分別傳送到hdfs和kafka,所以需要配置兩個channel
a3.sources.src_1.channels = ch_f_1 ch_f_2
a3.sources.src_1.type = avro
a3.sources.src_1.bind = 192.168.1.110
a3.sources.src_1.port = 44446
# 調整處理的執行緒數,提高處理能力
a3.sources.src_1.threads = 8
# 配置channel
# ch_f_1:對應hdfs sink
a3.channels.ch_f_1.type = file
# 檔案檢查點儲存路徑。File Channel在接收到source的資料後馬上寫入磁碟檔案中,然後通過一個記憶體佇列來儲存已被Source寫入但還未被Sink消費的Event的指標(Event指標指的是Event在磁碟上的資料檔案中的存放位置)。檢查點指的是channel每隔一段時間(checkpointInterval)將記憶體佇列的“快照”持久化到磁碟檔案以避免agent重啟後記憶體佇列資訊的丟失。為了保證“快照”的完整性,在將記憶體佇列持久到磁碟檔案時需要鎖定channel,就是說此過程Source不能寫Channel,Sink也不能讀Channel。
a3.channels.ch_f_1.checkpointDir = /data/flume/channels/user_click_event/checkpoint
# 是否需要備份檢查點。為保證資料的可靠性,一般設定為true
a3.channels.ch_f_1.useDualCheckpoints = true
# 備份檢查點儲存路徑,與checkpointDir使用不一樣的路徑
a3.channels.ch_f_1.backupCheckpointDir = /data/flume/channels/user_click_event/backup
# 檢查點執行間隔,以分鐘為單位
a3.channels.ch_f_1.checkpointInterval = 60000
# 資料儲存路徑
a3.channels.ch_f_1.dataDirs = /data/flume/channels/user_click_event/data
# 未提交的事務數量的最大值,通過調節此數字可以增加channel的吞吐量
a3.channels.ch_f_1.transactionCapacity = 100000
# 可以快取的event數量的最大值
a3.channels.ch_f_1.capacity = 500000
# 一個存放操作的等待時間值,以秒為單位。預設為3秒,在資料量較大的情況下可以適當調大
a3.channels.ch_f_1.keep-alive = 5
# 單個檔案的最大size,以byte為單位,預設為2G。file channel會在單個檔案達到最大值後新建立一個檔案來儲存資料,size過小會導致在資料量大的情況下頻繁的建立檔案,size過大會則會降低檔案讀寫的效率
a3.channels.ch_f_1.maxFileSize = 5368709120
# ch_f_2:對應kafka sink,使用的也是file channel,配置與上面類似,需要注意的是使用不同的儲存路徑
a3.channels.ch_f_2.type = file
a3.channels.ch_f_2.checkpointDir = /data2/flume/channels/user_action_event/checkpoint
a3.channels.ch_f_2.useDualCheckpoints = true
a3.channels.ch_f_2.backupCheckpointDir = /data2/flume/channels/user_action_event/backup
a3.channels.ch_f_2.dataDirs = /data2/flume/channels/user_action_event/data
a3.channels.ch_f_2.transactionCapacity = 100000
a3.channels.ch_f_2.capacity = 500000
a3.channels.ch_f_2.checkpointInterval = 60000
a3.channels.ch_f_2.keep-alive = 5
a3.channels.ch_f_2.maxFileSize = 536870912
# 配置sink
# sink_1:將資料寫入hdfs
a3.sinks.sink_1.channel = ch_1
a3.sinks.sink_1.type = hdfs
a3.sinks.sink_1.hdfs.path = hdfs://master:8020/user/dw/flume/clicklog/%Y-%m-%d
a3.sinks.sink_1.hdfs.filePrefix = logs
a3.sinks.sink_1.hdfs.inUsePrefix = .
# 跟日誌寫入一樣,hdfs的sink一開始會建立一個檔案用於寫入,正在寫入的檔案不能被讀取,為確保資料可以被讀取處理,sink會按照指定的條件對切分檔案,當條件滿足時,將正在寫入的檔案切換成可以讀取的檔案,然後建立另外一個檔案用於寫入。有三種切換方式:按照時間間隔;按照檔案大小;按照寫入的event的數量。使用中可以根據對資料處理的時效性要求進行設定,比如希望更快的讀取到資料則按照時間進行切分,如果對時效性要求不高則可以按照檔案大小切分
a3.sinks.sink_1.hdfs.rollInterval = 30
a3.sinks.sink_1.hdfs.rollSize = 0
a3.sinks.sink_1.hdfs.rollCount = 0
a3.sinks.sink_1.hdfs.batchSize = 1000
# 預設寫入hdfs使用的是sequencefile的檔案型別,使用這種格式需要將寫入格式設定為Text,否則hive或者impala不能正常讀取:
a3.sinks.sink_1.hdfs.writeFormat = Text
# sink_2:將資料寫入kafka
a3.sinks.sink_2.type = org.apache.flume.sink.kafka.KafkaSink
a3.sinks.sink_2.channel = ch_f_1
# 指定kafka的topic:即訊息的主題,以便kafka的消費端進行區分
a3.sinks.sink_2.kafka.topic = topic1
# kafka伺服器地址
a3.sinks.sink_2.kafka.bootstrap.servers = kafka_server01:9092;kafka_server02:9092
# 一個批次中處理的event數量,預設為100。增加此數字可以提高吞吐量,但會降低資訊處理的時效性,對於實時性要求高的場景,建議調低此配置
a3.sinks.sink_2.kafka.flumeBatchSize = 50
# kafka快取時間:在將資料寫入kafka時,等待多久以實現批量的匯入。預設是直接寫入,增大此配置可以增加吞吐量,但是會降低資料同步的時效性
a3.sinks.sink_2.kafka.producer.linger.ms = 1
# 訊息壓縮格式
a3.sinks.sink_2.kafka.producer.compression.type = snappy
【Agent-4】:位於彙總層,用於處理【Agent-2】傳送過來的資料
a4.sources = src_1
a4.channels = ch_f_1
a4.sinks = sink_1
# 配置 source: 用於處理【agent-2】發過來的資料,將資料分別傳送到kafka
a4.sources.src_1.channels = ch_f_1
a4.sources.src_1.type = avro
a4.sources.src_1.bind = 192.168.1.110
a4.sources.src_1.port = 44447
a4.sources.src_1.threads = 8
# 配置channel
a4.channels.ch_f_1.type = file
a4.channels.ch_f_1.checkpointDir = /data/flume/channels/realtime_event/checkpoint
a4.channels.ch_f_1.useDualCheckpoints = true
a4.channels.ch_f_1.backupCheckpointDir = /data/flume/channels/realtime_event/backup
a4.channels.ch_f_1.dataDirs = /data/flume/channels/realtime_event/data
a4.channels.ch_f_1.transactionCapacity = 100000
a4.channels.ch_f_1.capacity = 500000
a4.channels.ch_f_1.checkpointInterval = 60000
a4.channels.ch_f_1.keep-alive = 5
a4.channels.ch_f_1.maxFileSize = 5368709120
# 配置sink
a4.sinks.sink_2.type = org.apache.flume.sink.kafka.KafkaSink
a4.sinks.sink_2.channel = ch_f_1
a4.sinks.sink_2.kafka.topic = topic2
a4.sinks.sink_2.kafka.bootstrap.servers = kafka_server01:9092;kafka_server02:9092
a4.sinks.sink_2.kafka.flumeBatchSize = 20
a4.sinks.sink_2.kafka.producer.linger.ms = 1
a4.sinks.sink_2.kafka.producer.compression.type = snappy