1. 程式人生 > >Apache flume+Kafka獲取實時日誌資訊

Apache flume+Kafka獲取實時日誌資訊

Flume簡介以及安裝

  1. Flume是一個分散式的對海量日誌進行採集,聚合和傳輸的系統。Flume系統分為三個元件,分別是source,sink,channel:source表明資料的來源,可能來自檔案,Avro等,channel作為source和sink的橋樑,作為資料的臨時儲存地,channal是一個完整的事務,這一點保證了資料在收發的時候的一致性,支援的型別有: JDBC channel , File System channel , Memort channel等;sink表明資料的去向,可以把資料再次轉發到HDFS,或者Kafka等。

  2. 本文使用的版本是1.8.0(目前最新)的版本,可以到

    官網 進行下載。

  3. 解壓、安裝、配置

sudo tar -zxvf apache-flume-1.8.0-bin.tar.gz
cd apache-flume-1.8.0-bin
sudo vim conf/kafka.conf #這個檔案剛開始並不存在,要新建

kafka.conf的具體內容:

 

# 分別對應三種基礎元件,起的別名 kafka是在啟動flume的時候,指定的agent的名字
kafka.sources = src
kafka.sinks = sk
kafka.channels = chl

# 表明需要收集的資料來自avro,此處配置會啟動avro server
kafka.sources.src.type = avro
kafka.sources.src.bind = localhost
kafka.sources.src.port = 44446

# Flume 收集的資料轉發到Kafka的關鍵配置
kafka.sinks.sk.type = org.apache.flume.sink.kafka.KafkaSink
kafka.sinks.sk.kafka.bootstrap.servers = localhost:9092  #指定kafka叢集的地址
kafka.sinks.sk.partition.key=0
kafka.sinks.sk.partitioner.class=org.apache.flume.plugins.SinglePartition
kafka.sinks.sk.serializer.class=kafka.serializer.StringEncoder
kafka.sinks.sk.request.required.acks=0
kafka.sinks.sk.max.message.size=1000000
kafka.sinks.sk.producer.type=sync
kafka.sinks.sk.topic=log  #指定kafka的topic

# Use a channel which buffers events in memory
kafka.channels.chl.type = memory
kafka.channels.chl.capacity = 1000
kafka.channels.chl.transactionCapacity = 100

# Bind the source and sink to the channel
kafka.sources.src.channels = chl
kafka.sinks.sk.channel = chl

Flume服務啟動:

 

bin/flume-ng agent --conf conf --conf-file conf/kafka.conf --name kafka -Dflume.root.logger=INFO,LOGFILE

ps:上述命令也可以通過nohup方式啟動
至此,flume已經啟動完成了!
接下來我們請出另一個主角 Kafka
同樣是下載,安裝,配置的步驟:

 

wget http://mirrors.shuosc.org/apache/kafka/1.0.0/kafka_2.12-1.0.0.tgz
tar -zxf kafka_2.12-1.0.0.tgz
cd kafka_2.12-1.0.0
vim config/server.properties

下面是config/server.properties的配置和說明:

 

#指定broker的id,數字,但不能過大
broker.id=0
#指定服務的監聽埠,預設是9092
port=9092
#這個地方需要特別注意,在程式碼中使用的時候,需要完全複製這個地方的配置,如localhost:9092,但是如果是分散式的,localhost顯然是不符合要求的,最好寫成當前機器的ipv4的地址,這裡寫localhost方便單機測試和開發
listeners=PLAINTEXT://localhost:9092
#同上,具體含義看官方原始檔中的說明
advertised.listeners=PLAINTEXT://localhost:9092
#指定zk的地址
zookeeper.connect=localhost:2181

配置基本上是可以了,然後開始run

 

bin/kafka-server-start config/server.properties

注意: 在啟動kafka之前需要先啟動zk,可以從官網專門下載一個zk,或者使用kafka自帶的zk,都可以。

 

 

Java程式碼實現日誌列印並被Flume採集

接下來就需要在程式碼實現日誌的列印,通過flume的採集,然後傳送到kafka,其實flume採集傳送到kafka通過上述配置就已經完成了,現在就做第一件事:
我個人採用spring boot進行實現的,在pom中進行如下配置:

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
        <exclusions><!--把logback忽略,使用log4j-->
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-log4j2</artifactId>
    </dependency>
    <!--flume對log4j的支援,使用avro的關鍵-->
    <dependency>
      <groupId>org.apache.flume.flume-ng-clients</groupId>
      <artifactId>flume-ng-log4jappender</artifactId>
      <version>1.5.0</version>
    </dependency>

然後在src/main/resources下面,加入log配置檔案:log4j.properties

 

# set log levels
log4j.rootLogger=INFO, stdout, file, flume
log4j.logger.per.flume=INFO

#flume#
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
##重點在這裡!!這個地方就是在flume配置的avro的server地址,程式中產生的log都會通過avro的方式,被flume所採集
log4j.appender.flume.Hostname=localhost
log4j.appender.flume.Port=44446

#stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Threshold=INFO
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n

 #file 
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.Threshold=INFO
log4j.appender.file.File=/tmp/logs/real-log.log
log4j.appender.file.Append=true
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n

關於這個專案的簡單程式碼,已經在我的gitee上了,這個demo主要是通過列印日誌,flume 收集並publish 到 kafka指定的topic上,然後通過kafka consume接受到,後續會通過storm進行實時計算和處理