1. 程式人生 > >使用Flume+Kafka+SparkStreaming進行實時日誌分析

使用Flume+Kafka+SparkStreaming進行實時日誌分析

每個公司想要進行資料分析或資料探勘,收集日誌、ETL都是第一步的,今天就講一下如何實時地(準實時,每分鐘分析一次)收集日誌,處理日誌,把處理後的記錄存入Hive中,並附上完整實戰程式碼

1. 整體架構

思考一下,正常情況下我們會如何收集並分析日誌呢?

首先,業務日誌會通過Nginx(或者其他方式,我們是使用Nginx寫入日誌)每分鐘寫入到磁碟中,現在我們想要使用Spark分析日誌,就需要先將磁碟中的檔案上傳到HDFS上,然後Spark處理,最後存入Hive表中,如圖所示:

這裡寫圖片描述

我們之前就是使用這種方式每天分析一次日誌,但是這樣有幾個缺點:

首先我們的日誌是通過Nginx每分鐘存成一個檔案,這樣一天的檔案數很多,不利於後續的分析任務,所以先要把一天的所有日誌檔案合併起來

合併起來以後需要把該檔案從磁碟傳到Hdfs上,但是我們的日誌伺服器並不在Hadoop叢集內,所以沒辦法直接傳到Hdfs上,需要首先把檔案從日誌伺服器傳輸到Hadoop叢集所在的伺服器,然後再上傳到Hdfs

最後也是最重要的,滯後一天分析資料已經不能滿足我們新的業務需求了,最好能控制在一個小時的滯後時間

可以看出來我們以前收集分析日誌的方式還是比較原始的,而且比較耗時,很多時間浪費在了網路傳輸上面,如果日誌量大的話還有丟失資料的可能性,所以在此基礎上改進了一下架構:

這裡寫圖片描述

整個過程就是,Flume會實時監控寫入日誌的磁碟,只要有新的日誌寫入,Flume就會將日誌以訊息的形式傳遞給Kafka,然後Spark Streaming實時消費訊息傳入Hive

那麼Flume是什麼呢,它為什麼可以監控一個磁碟檔案呢?簡而言之,Flume是用來收集、匯聚並且移動大量日誌檔案的開源框架,所以很適合這種實時收集日誌並且傳遞日誌的場景

Kafka是一個訊息系統,Flume收集的日誌可以移動到Kafka訊息佇列中,然後就可以被多處消費了,而且可以保證不丟失資料

通過這套架構,收集到的日誌可以及時被Flume發現傳到Kafka,通過Kafka我們可以把日誌用到各個地方,同一份日誌可以存入Hdfs中,也可以離線進行分析,還可以實時計算,而且可以保證安全性,基本可以達到實時的要求

整個流程已經清晰了,下面各個突破,我們開始動手實現整套系統

2. 實戰演練

2.1 安裝Kafka

下載安裝Kafka以及一些基本命令請傳送到這裡: Kafka安裝與簡介

安裝好以後新建名為launcher_click的topic:

bin/kafka-topics.sh --create --zookeeper hxf:2181,cfg:2181,jqs:2181,jxf:2181,sxtb:2181 --replication-factor 2 --partitions 2 --topic launcher_click

檢視一下該topic:

bin/kafka-topics.sh --describe --zookeeper hxf:2181,cfg:2181,jqs:2181,jxf:2181,sxtb:2181 --topic launcher_click

這裡寫圖片描述

2.2 安裝Flume

1、下載解壓

wget http://apache.fayea.com/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
tar -xvf apache-flume-1.7.0-bin.tar.gz

2、修改配置檔案

進入flume目錄,修改conf/flume-env.sh

export JAVA_HOME=/data/install/jdk
export JAVA_OPTS="-Xms1000m -Xmx2000m -Dcom.sun.management.jmxremote"

新增配置檔案:conf/flume_launcherclick.conf

# logser可以看做是flume服務的名稱,每個flume都由sources、channels和sinks三部分組成
# sources可以看做是資料來源頭、channels是中間轉存的渠道、sinks是資料後面的去向
logser.sources = src_launcherclick
logser.sinks = kfk_launcherclick
logser.channels = ch_launcherclick

# source
# 源頭型別是TAILDIR,就可以實時監控以追加形式寫入檔案的日誌
logser.sources.src_launcherclick.type = TAILDIR
# positionFile記錄所有監控的檔案資訊
logser.sources.src_launcherclick.positionFile = /data/install/flume/position/launcherclick/taildir_position.json
# 監控的檔案組
logser.sources.src_launcherclick.filegroups = f1
# 檔案組包含的具體檔案,也就是我們監控的檔案
logser.sources.src_launcherclick.filegroups.f1 = /data/launcher/stat_app/.*

# interceptor
# 寫kafka的topic即可
logser.sources.src_launcherclick.interceptors = i1 i2
logser.sources.src_launcherclick.interceptors.i1.type=static
logser.sources.src_launcherclick.interceptors.i1.key = type
logser.sources.src_launcherclick.interceptors.i1.value = launcher_click
logser.sources.src_launcherclick.interceptors.i2.type=static
logser.sources.src_launcherclick.interceptors.i2.key = topic
logser.sources.src_launcherclick.interceptors.i2.value = launcher_click

# channel
logser.channels.ch_launcherclick.type = memory
logser.channels.ch_launcherclick.capacity = 10000
logser.channels.ch_launcherclick.transactionCapacity = 1000

# kfk sink
# 指定sink型別是Kafka,說明日誌最後要傳送到Kafka
logser.sinks.kfk_launcherclick.type = org.apache.flume.sink.kafka.KafkaSink
# Kafka broker
logser.sinks.kfk_launcherclick.brokerList = 10.0.0.80:9092,10.0.0.140:9092

# Bind the source and sink to the channel
logser.sources.src_launcherclick.channels = ch_launcherclick
logser.sinks.kfk_launcherclick.channel = ch_launcherclick

3、啟動

nohup bin/flume-ng agent --conf conf/ --conf-file conf/flume_launcherclick.conf --name logser -Dflume.root.logger=INFO,console >> logs/flume_launcherclick.log &

此時Kafka和Flume都已經啟動了,從配置可以看到Flume的監控檔案是/data/launcher/stat_app/.*,所以只要該目錄下檔案內容有增加就會發送到Kafka,大家可以自己追加一些測試日誌到這個目錄的檔案下,然後開一個Kafka Consumer看一下Kafka是否接收到訊息,這裡我們完成SparkStreaming以後再看測試結果

2.3 SparkStreaming程式設計

SparkStreaming是Spark用來處理實時流的,能夠實時到秒級,我們這裡不需要這麼實時,是每分鐘執行一次日誌分析程式,主要程式碼如下:

  def main(args: Array[String]) {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkConf = new SparkConf().setAppName("LauncherStreaming")

    //每60秒一個批次
    val ssc = new StreamingContext(sparkConf, Seconds(60))

    // 從Kafka中讀取資料
    val kafkaStream = KafkaUtils.createStream(
      ssc,
      "hxf:2181,cfg:2181,jqs:2181,jxf:2181,sxtb:2181", // Kafka叢集使用的zookeeper
      "launcher-streaming", // 該消費者使用的group.id
      Map[String, Int]("launcher_click" -> 0, "launcher_click" -> 1), // 日誌在Kafka中的topic及其分割槽
      StorageLevel.MEMORY_AND_DISK_SER).map(_._2) // 獲取日誌內容

    kafkaStream.foreachRDD((rdd: RDD[String], time: Time) => {
      val result = rdd.map(log => parseLog(log)) // 分析處理原始日誌
        .filter(t => StringUtils.isNotBlank(t._1) && StringUtils.isNotBlank(t._2))
      // 存入hdfs
      result.saveAsHadoopFile(HDFS_DIR, classOf[String], classOf[String], classOf[LauncherMultipleTextOutputFormat[String, String]])
    })

    ssc.start()
    // 等待實時流
    ssc.awaitTermination()
  }

然後打包上傳到master執行:

nohup /data/install/spark-2.0.0-bin-hadoop2.7/bin/spark-submit  --master spark://hxf:7077  --executor-memory 1G --total-executor-cores 4   --class com.analysis.main.LauncherStreaming --jars /home/hadoop/jar/kafka-clients-0.10.0.0.jar,/home/hadoop/jar/metrics-core-2.2.0.jar,/home/hadoop/jar/zkclient-0.3.jar,/home/hadoop/jar/spark-streaming-kafka-0-8_2.11-2.0.0.jar,/home/hadoop/jar/kafka_2.11-0.8.2.1.jar  /home/hadoop/jar/SparkLearning.jar  >> /home/hadoop/logs/LauncherDM.log &

然後開始測試,往Flume監控目錄/data/launcher/stat_app/.*寫日誌,原始日誌內容類似下面這樣:

118.120.102.3|1495608541.238|UEsDBBQACAgIACB2uEoAAAAAAAAAAAAAAAABAAAAMGWUbW7bMAyGb6NfnUFRFEWhJ+gBdgBZVjpjjp04brMAO*yY2DKa9Y+B1+DnQ1LCztoITgK4wPGHfNUhmKGUPOn3DyP*zdOxSWM3T33XXMqy9OP7xXTZiTC1xlL0HgMEi+BfHoooBEGKr3fPpYy5jMse4Xzupus4TKkrs4kZOhI51CgWWKxsUQBRPMDr1*w5Hcuc0LiUEFBwdXQxAARXHb3+QXlOfzya0uZWOGwlEwBDwLD5oJBVFHsEEPF2U0EUToyr8k4tg9v8AkRrIcKmxGsU2eqQIM45dKuKFICo5oveEqOjh2JAIITImyIJqBk3JS4qh7Wby*TroxnL9ZKHXrsyWeBQoMXaEgXUKh6mOQ1l7NLc*Hwz8aDpAtndLFJEetkVc6S9V*bg+RFiKMvnTv6ahuGUTmWexqEfi3Elezx0botJrCCQn5jfCzWaqaUOqNpFYO23ckYl5GOlx4rLQuUllh27SsjZyLQTUn4K+3uVczlOi+7uuMzTYLoibeIspk71DtKuJC+7T5qXPg9lLddaZs6+Lolnj7ANW0dBGKOn72m3cbQJI2Kq4*C6Xhz9E5Pzeeg*i2l1IAJtpReILNq6DY4peFjHeO5vffPZd2UyejEJ28Puo0sI*2*5ojvhfNcquWomFMVp02Pz++M6Nach3e6XR5wOlrdSg4T7RkgtQAuC6HYl2sc62i6dUq*om+HWjvdHAPSk8hYkegHraxC8PwPons73XZeozDfXmaRzzzaD2XI4fX0QX*8BUEsHCKeftc48AgAAmQQAAA==

檢視HDFS的對應目錄是否有內容:

這裡寫圖片描述

HDFS儲存的分析後的日誌內容如下:

99000945863664;864698037273329|119.176.140.248|1495594615129|2017-05-24 10:56:55|xiaomi|redmi4x|com.jingdong.app.mall&0ae359b6&1495534579412&1;com.autonavi.minimap&279f562f&1495534597934,1495534616627&2;com.android.contacts&91586932&1495538267103,1495540527138,1495576834653,1495583404117,1495591231535&5

SparkStreaming任務狀態如下:

這裡寫圖片描述

可以看到的確是每分鐘執行一次

Refer