1. 程式人生 > >基於flume的日誌收集系統配置

基於flume的日誌收集系統配置

大資料系統中通常需要採集的日誌有:

  1. 系統訪問日誌
  2. 使用者點選日誌
  3. 其他業務日誌(比如推薦系統的點選日誌)

在收集日誌的時候,一般分為三層結構:採集層、彙總層和儲存層,而不是直接從採集端將資料傳送到儲存端,這樣的好處有:

  1. 如果儲存端如Hadoop叢集、Kafka等需要停機維護或升級,對部署在應用伺服器上的採集端沒有影響,只需要彙總層做好資料的緩衝,在儲存端恢復正常後繼續寫入資料。
  2. 採集層只負責資料的採集,由彙總層統一維護資料的路由邏輯(比如傳送到hdfs還是kafka?),由於採集端所在的應用伺服器一般數量較多,且會隨著業務的擴充套件而不斷增加,這種方式可以降低日誌採集配置的維護成本,降低大資料應用對業務系統的影響

基於三層結構的flume日誌採集系統架構一般如下圖所示:

說明:

對於採集層agent,一般要求儘快將日誌傳送出去,避免在採集層堆積資料,所以使用memory的channel,sink統一使用avro;對於彙總層agent,要求可以儘量保證資料的緩衝,所以使用file channel,並且儘量調大容量,對於要求實時處理的資料,可以使用SSD的磁碟以提高處理速度,source統一使用avro。

各agent的配置如下:

【Agent-1】:位於採集層,用於收集應用A產生的日誌,這些日誌需要儲存到HDFS中用於離線分析,同時也需要傳送給Kafka用於實時的計算(如使用者點選日誌)。

a1.sources = src_1

a1.channels = ch_m_1

a1.sinks = sink_1 sink_2

# 配置 source:從指定目錄讀取日誌資料

a1.sources.src_1.type = spooldir

a1.sources.src_1.channels = ch_m_1

a1.sources.src_1.spoolDir = /data/nginx/log/user_click/

a1.sources.src_1.includePattern=^.*$

# 日誌資料一般是按照size或者時間切換,應用正在寫入的檔案不能讀取,否則flume會報錯,所以需要把這個檔案排除掉。比如正在寫入的是user.click.log,切換後的是user.click.log.yyyy.mm.dd.hh,則需要把user.click.log檔案排除掉:

a1.sources.src_1.ignorePattern=^.*log$

# 配置 channel

a1.channels.ch_m_1.type = memory

# channel中可以快取的event數量的最大值。可以根據單個event所佔空間和可用的記憶體來評估可以快取的event的最大數量

a1.channels.ch_m_1.capacity = 100000

# 一個事務中可批量接收或傳送的event數量的最大值

a1.channels.ch_m_1.transactionCapacity = 5000

# 配置 sinks:多個sink節點,用於負載均衡

a1.sinks.sink_1.channel = ch_m_1

a1.sinks.sink_1.type = avro

# 彙總層節點地址

a1.sinks.sink_1.hostname = 192.168.1.110

# 彙總層節點avro監聽埠

a1.sinks.sink_1.port = 44446

a1.sinks.sink_2.channel = ch_m_1

a1.sinks.sink_2.type = avro

# 彙總層節點地址

a1.sinks.sink_2.hostname = 192.168.1.111

# 彙總層節點avro監聽埠

a1.sinks.sink_2.port = 44446

# sink端的負載均衡配置

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = sink_1 sink_2

a1.sinkgroups.g1.processor.type = load_balance

# 使用輪詢的方式選擇sink

a1.sinkgroups.g1.processor.selector = round_robin

【Agent-2】:位於採集層,用於收集應用B產生的日誌,這些日誌只需要傳送給Kafka用於實時的計算。

a2.sources = src_1

a2.channels = ch_m_1

a2.sinks = sink_1 sink_2

# 配置 source:從指定目錄讀取日誌資料

a2.sources.src_1.type = spooldir

a2.sources.src_1.channels = ch_m_1

a2.sources.src_1.type = exec

# 使用tail -f 命令監聽日誌的修改,可以近實時的採集到需要的日誌資料

a2.sources.src_1.command = tail -F /data/nginx/log/user_action/action.log

# 配置 channel

a2.channels.ch_m_1.type = memory

# channel中可以快取的event數量的最大值。可以根據單個event所佔空間和可用的記憶體來評估可以快取的event的最大數量

a2.channels.ch_m_1.capacity = 100000

# 一個事務中可批量接收或傳送的event數量的最大值

a2.channels.ch_m_1.transactionCapacity = 5000

# 配置 sinks:多個sink節點,用於負載均衡

a2.sinks.sink_1.channel = ch_m_1

a2.sinks.sink_1.type = avro

# 彙總層節點地址

a2.sinks.sink_1.hostname = 192.168.1.110

# 彙總層節點avro監聽埠,由於處理邏輯不一樣,所以此處彙總層的avro埠與【Agent-1】不同

a2.sinks.sink_1.port = 44447

a2.sinks.sink_2.channel = ch_m_1

a2.sinks.sink_2.type = avro

# 彙總層節點地址

a2.sinks.sink_2.hostname = 192.168.1.111

# 彙總層節點avro監聽埠

a2.sinks.sink_2.port = 44447

# sink端的負載均衡配置

a2.sinkgroups = g1

a2.sinkgroups.g1.sinks = sink_1 sink_2

a2.sinkgroups.g1.processor.type = load_balance

# 使用輪詢的方式選擇sink

a2.sinkgroups.g1.processor.selector = round_robin

【Agent-3】:位於彙總層,用於處理【Agent-1】傳送過來的資料,由於需要同時將資料傳送給HDFS和Kafka,所以配置了兩個channel。

a3.sources = src_1

a3.channels = ch_f_1 ch_f_2

a3.sinks = sink_1 sink_2

# 配置 source: 用於處理【agent-1】發過來的資料,將資料分別傳送到hdfs和kafka,所以需要配置兩個channel

a3.sources.src_1.channels = ch_f_1 ch_f_2

a3.sources.src_1.type = avro

a3.sources.src_1.bind = 192.168.1.110

a3.sources.src_1.port = 44446

# 調整處理的執行緒數,提高處理能力

a3.sources.src_1.threads = 8

# 配置channel

# ch_f_1:對應hdfs sink

a3.channels.ch_f_1.type = file

# 檔案檢查點儲存路徑。File Channel在接收到source的資料後馬上寫入磁碟檔案中,然後通過一個記憶體佇列來儲存已被Source寫入但還未被Sink消費的Event的指標(Event指標指的是Event在磁碟上的資料檔案中的存放位置)。檢查點指的是channel每隔一段時間(checkpointInterval)將記憶體佇列的“快照”持久化到磁碟檔案以避免agent重啟後記憶體佇列資訊的丟失。為了保證“快照”的完整性,在將記憶體佇列持久到磁碟檔案時需要鎖定channel,就是說此過程Source不能寫Channel,Sink也不能讀Channel。

a3.channels.ch_f_1.checkpointDir = /data/flume/channels/user_click_event/checkpoint

# 是否需要備份檢查點。為保證資料的可靠性,一般設定為true

a3.channels.ch_f_1.useDualCheckpoints = true

# 備份檢查點儲存路徑,與checkpointDir使用不一樣的路徑

a3.channels.ch_f_1.backupCheckpointDir = /data/flume/channels/user_click_event/backup

# 檢查點執行間隔,以分鐘為單位

a3.channels.ch_f_1.checkpointInterval = 60000

# 資料儲存路徑

a3.channels.ch_f_1.dataDirs = /data/flume/channels/user_click_event/data

# 未提交的事務數量的最大值,通過調節此數字可以增加channel的吞吐量

a3.channels.ch_f_1.transactionCapacity = 100000

# 可以快取的event數量的最大值

a3.channels.ch_f_1.capacity = 500000

# 一個存放操作的等待時間值,以秒為單位。預設為3秒,在資料量較大的情況下可以適當調大

a3.channels.ch_f_1.keep-alive = 5

# 單個檔案的最大size,以byte為單位,預設為2G。file channel會在單個檔案達到最大值後新建立一個檔案來儲存資料,size過小會導致在資料量大的情況下頻繁的建立檔案,size過大會則會降低檔案讀寫的效率

a3.channels.ch_f_1.maxFileSize = 5368709120

# ch_f_2:對應kafka sink,使用的也是file channel,配置與上面類似,需要注意的是使用不同的儲存路徑

a3.channels.ch_f_2.type = file

a3.channels.ch_f_2.checkpointDir = /data2/flume/channels/user_action_event/checkpoint

a3.channels.ch_f_2.useDualCheckpoints = true

a3.channels.ch_f_2.backupCheckpointDir = /data2/flume/channels/user_action_event/backup

a3.channels.ch_f_2.dataDirs = /data2/flume/channels/user_action_event/data

a3.channels.ch_f_2.transactionCapacity = 100000

a3.channels.ch_f_2.capacity = 500000

a3.channels.ch_f_2.checkpointInterval = 60000

a3.channels.ch_f_2.keep-alive = 5

a3.channels.ch_f_2.maxFileSize = 536870912

# 配置sink

# sink_1:將資料寫入hdfs

a3.sinks.sink_1.channel = ch_1

a3.sinks.sink_1.type = hdfs

a3.sinks.sink_1.hdfs.path = hdfs://master:8020/user/dw/flume/clicklog/%Y-%m-%d

a3.sinks.sink_1.hdfs.filePrefix = logs

a3.sinks.sink_1.hdfs.inUsePrefix = .

# 跟日誌寫入一樣,hdfs的sink一開始會建立一個檔案用於寫入,正在寫入的檔案不能被讀取,為確保資料可以被讀取處理,sink會按照指定的條件對切分檔案,當條件滿足時,將正在寫入的檔案切換成可以讀取的檔案,然後建立另外一個檔案用於寫入。有三種切換方式:按照時間間隔;按照檔案大小;按照寫入的event的數量。使用中可以根據對資料處理的時效性要求進行設定,比如希望更快的讀取到資料則按照時間進行切分,如果對時效性要求不高則可以按照檔案大小切分

a3.sinks.sink_1.hdfs.rollInterval = 30

a3.sinks.sink_1.hdfs.rollSize = 0

a3.sinks.sink_1.hdfs.rollCount = 0

a3.sinks.sink_1.hdfs.batchSize = 1000

# 預設寫入hdfs使用的是sequencefile的檔案型別,使用這種格式需要將寫入格式設定為Text,否則hive或者impala不能正常讀取:

a3.sinks.sink_1.hdfs.writeFormat = Text

# sink_2:將資料寫入kafka

a3.sinks.sink_2.type = org.apache.flume.sink.kafka.KafkaSink

a3.sinks.sink_2.channel = ch_f_1

# 指定kafka的topic:即訊息的主題,以便kafka的消費端進行區分

a3.sinks.sink_2.kafka.topic = topic1

# kafka伺服器地址

a3.sinks.sink_2.kafka.bootstrap.servers = kafka_server01:9092;kafka_server02:9092

# 一個批次中處理的event數量,預設為100。增加此數字可以提高吞吐量,但會降低資訊處理的時效性,對於實時性要求高的場景,建議調低此配置

a3.sinks.sink_2.kafka.flumeBatchSize = 50

# kafka快取時間:在將資料寫入kafka時,等待多久以實現批量的匯入。預設是直接寫入,增大此配置可以增加吞吐量,但是會降低資料同步的時效性

a3.sinks.sink_2.kafka.producer.linger.ms = 1

# 訊息壓縮格式

a3.sinks.sink_2.kafka.producer.compression.type = snappy

【Agent-4】:位於彙總層,用於處理【Agent-2】傳送過來的資料

a4.sources = src_1

a4.channels = ch_f_1

a4.sinks = sink_1

# 配置 source: 用於處理【agent-2】發過來的資料,將資料分別傳送到kafka

a4.sources.src_1.channels = ch_f_1

a4.sources.src_1.type = avro

a4.sources.src_1.bind = 192.168.1.110

a4.sources.src_1.port = 44447

a4.sources.src_1.threads = 8

# 配置channel

a4.channels.ch_f_1.type = file

a4.channels.ch_f_1.checkpointDir = /data/flume/channels/realtime_event/checkpoint

a4.channels.ch_f_1.useDualCheckpoints = true

a4.channels.ch_f_1.backupCheckpointDir = /data/flume/channels/realtime_event/backup

a4.channels.ch_f_1.dataDirs = /data/flume/channels/realtime_event/data

a4.channels.ch_f_1.transactionCapacity = 100000

a4.channels.ch_f_1.capacity = 500000

a4.channels.ch_f_1.checkpointInterval = 60000

a4.channels.ch_f_1.keep-alive = 5

a4.channels.ch_f_1.maxFileSize = 5368709120

# 配置sink 

a4.sinks.sink_2.type = org.apache.flume.sink.kafka.KafkaSink

a4.sinks.sink_2.channel = ch_f_1

a4.sinks.sink_2.kafka.topic = topic2

a4.sinks.sink_2.kafka.bootstrap.servers = kafka_server01:9092;kafka_server02:9092

a4.sinks.sink_2.kafka.flumeBatchSize = 20

a4.sinks.sink_2.kafka.producer.linger.ms = 1

a4.sinks.sink_2.kafka.producer.compression.type = snappy