1. 程式人生 > >大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰

大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰

大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰

一、實時流處理

1.1 實時計算

跟實時系統類似(能在嚴格的時間限制內響應請求的系統),例如在股票交易中,市場資料瞬息萬變,決策通常需要秒級甚至毫秒級。通俗來說,就是一個任務需要在非常短的單位時間內計算出來,這個計算通常是多次的。

1.2 流式計算

通常指源源不斷的資料流過系統,系統能夠不停地連續計算。這裡對時間上可能沒什麼特別限制,資料流入系統到產生結果,可能經過很長時間。比如系統中的日誌資料、電商中的每日使用者訪問瀏覽資料等。

1.3 實時流式計算

將實時計算和流式資料結合起來,就是實時流式計算,也就是大資料中通常說的實時流處理。資料來源源不斷的產生的同時,計算時間上也有了嚴格的限制。比如,目前電商中的商品推薦,往往在你點了某個商品之後,推薦的商品都是變化的,也就是實時的計算出來推薦給你的。再比如你的手機號,在你話費或者流量快用完時,實時的給你推薦流量包套餐等。

二、實時流處理實戰

此例子借鑑慕課網實戰視訊Spark Streaming實時流處理專案實戰,感興趣的可以學習一下。

2.1 源源不斷的資料

此處使用log4j模擬源源不斷產生的日誌資料,啟動一個程序,不停地列印資料即可。簡單配置如下:

log4j.rootLogger=
INFO,stdout,flume log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.target = System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

然後寫一個簡單的日誌列印小程式即可,程式碼如下:

public class LoggerGenerator {
    private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());

    public static void main(String[] args) throws InterruptedException {
        int index = 0;
        while (true) {
            Thread.sleep(1000);
            logger.info("value : " + index++);
        }
    }
}

2.2 實時採集資料

可以採用Flume實時採集日誌資料,為了和log4j結合,log4j配置檔案需要如下配置

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n


log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = wds
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true

同時,flume配置如下,

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=logger-sink

# define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414

# define channel
agent1.channels.logger-channel.type=memory

#define sink
agent1.sinks.logger-sink.type = logger

agent1.sources.avro-source.channels=logger-channel
agent1.sinks.logger-sink.channel=logger-channel

這裡暫時採用logger sink, 目的是為了測試資料能否採集到。做專案的過程中,不要想著一步到位,最好做一步測試一步,方便定位錯誤,防止錯誤累積。

啟動flume

 flume-ng agent \
--name agent1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
-Dflume.root.logger=INFO,console 

注意,這裡可能會報錯,java.lang.ClassNotFoundException: org.apache.flume.clients.log4jappender.Log4jAppender。需要引入jar包來解決此問題

  <dependency>
        <groupId>org.apache.flume.flume-ng-clients</groupId>
        <artifactId>flume-ng-log4jappender</artifactId>
        <version>1.6.0</version>
  </dependency>

這時候啟動日誌列印檔案,可以看到flume已經開始採集資料了。

2018-12-07 21:39:03,204 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236744447, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 30                      value : 0 }
2018-12-07 21:39:03,609 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236745545, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 31                      value : 1 }
2018-12-07 21:39:04,611 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236746548, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 32                      value : 2 }
2018-12-07 21:39:05,614 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236747550, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 33                      value : 3 }
2018-12-07 21:39:06,617 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236748554, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 34                      value : 4 }
2018-12-07 21:39:07,620 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236749556, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 35                      value : 5 }
2018-12-07 21:39:08,626 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236750560, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 36                      value : 6 }
2018-12-07 21:39:09,632 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236751568, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 37                      value : 7 }
2018-12-07 21:39:10,636 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236752572, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 38                      value : 8 }
2018-12-07 21:39:11,638 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236753575, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 39                      value : 9 }
2018-12-07 21:39:12,640 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236754577, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 31 30                   value : 10 }

2.3 訊息佇列緩衝資料

這裡flume採集到的資料將存到kafka中,作為一個緩衝,也即生產者。此時,需要將flume和kafka打通,flume配置修改如下:

 agent1.sources=avro-source
 agent1.channels=logger-channel
 agent1.sinks=kafka-sink
 
 # define source
 agent1.sources.avro-source.type=avro
 agent1.sources.avro-source.bind=0.0.0.0
 agent1.sources.avro-source.port=41414

 # define channel
 agent1.channels.logger-channel.type=memory

 #define sink
 agent1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
 agent1.sinks.kafka-sink.brokerList = wds:9092
 agent1.sinks.kafka-sink.topic = streamingtopic
 agent1.sinks.kafka-sink.batchSize = 20
 agent1.sinks.kafka-sink.requiredAcks = 1
 
 agent1.sources.avro-source.channels=logger-channel
 agent1.sinks.kafka-sink.channel=logger-channel

kafka依賴zookeeper,需要先啟動zookeeper

$ZK_HOME/bin/zkServer.sh start

啟動kafka,需要指定配置檔案

 bin/kafka-server-start.sh config/server.properties

建立topic

kafka-topics.sh --create --zookeeper wds:2181 --replication-factor 1 --partitions 1 --topic streamingtopic

啟動消費程序,測試消費資訊測試

kafka-console-consumer.sh --zookeeper wds:2181  --topic streamingtopic

可以看到每隔20個(由flume配置檔案裡的agent1.sinks.kafka-sink.batchSize = 20決定的)有消費資訊輸出,證明連線成功

[[email protected] ~]$  kafka-console-consumer.sh --zookeeper wds:2181  --topic streamingtopic
value : 0
value : 1
value : 2
value : 3
value : 4
value : 5
value : 6
value : 7
value : 8
value : 9
value : 10
value : 11
value : 12
value : 13
value : 14
value : 15
value : 16
value : 17
value : 18
value : 19

2.4 實時處理資料

使用Spark Streaming從Kafka消費訊息,這裡採用Receiver模式,可以參照下面簡單的程式碼

object KafkaReceiverWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length != 4) {
      System.err.println("Usage: KafkaReceiverWordCount <ZkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }
    val Array(zkQuorum, group, topics, numThreads) = args

    val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    // Kafka對接Spark Streaming
    val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
    // _1引數沒有用
    messages.map(_._2).count()

    ssc.start()
    ssc.awaitTermination()

  }
}

注意,日誌和Streaming日誌接收都是在本地的,那麼生產環境如何做呢?

1) 打包jar,指向LoggerGenerator類
2) Flume和Kafka一樣
3) Spark Streaming也需要打成jar包,然後提交到叢集群執行,Spark-submit方式提交執行,模式為local/yarn/standalone/mesos

jar包使用maven的mvn assembly:assembly -Dmaven.test.skip=true方式打包,把kafka相關jar包也打進去,不需要的使用provided
打包的時候需要將LoggerGenerator從test中移出來放到java的某個包下,方便執行

打包成功後,在服務上執行

java -cp spark-test-1.0-jar-with-dependencies.jar com.wds.streaming.LoggerGenerator

其中-cp命令是將xxx.jar加入到classpath,這樣java class loader就會在這裡面查詢匹配的類,這樣打包的時候不用指定main class,非常方便。然後提交jar包到spark-submit

spark-submit \
--class com.wds.streaming.KafkaStreamingApp \
--master local[2] \
--name KafkaStreamingApp \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.2 \
/home/hadoop/hadoop/lib/spark-test-1.0-jar-with-dependencies.jar wds:2181 test streamingtopic 2

日誌輸出如下

[[email protected] lib]$ java -cp spark-test-1.0-jar-with-dependencies.jar com.wds.streaming.LoggerGenerator
log4j:ERROR Could not find value for key log4j.appender.flume.layout
2018-12-07 22:32:27,447 [main] [org.apache.flume.api.NettyAvroRpcClient] [WARN] - Using default maxIOWorkers
2018-12-07 22:32:28,681 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 0
2018-12-07 22:32:29,751 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 1
2018-12-07 22:32:30,753 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 2
2018-12-07 22:32:31,755 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 3
2018-12-07 22:32:32,758 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 4
2018-12-07 22:32:33,760 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 5
2018-12-07 22:32:34,762 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 6
2018-12-07 22:32:35,764 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 7
2018-12-07 22:32:36,765 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 8
2018-12-07 22:32:37,767 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 9
2018-12-07 22:32:38,770 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 10
2018-12-07 22:32:39,772 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 11
2018-12-07 22:32:40,775 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 12
2018-12-07 22:32:41,777 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 13
2018-12-07 22:32:42,779 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 14
2018-12-07 22:32:43,782 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 15
2018-12-07 22:32:44,784 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 16

Spark Streaming輸出如下:

18/12/07 22:32:50 INFO executor.Executor: Running task 0.0 in stage 12.0 (TID 10)
18/12/07 22:32:50 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 2 blocks
18/12/07 22:32:50 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
18/12/07 22:32:50 INFO executor.Executor: Finished task 0.0 in stage 12.0 (TID 10). 1705 bytes result sent to driver
18/12/07 22:32:50 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 12.0 (TID 10) in 6 ms on localhost (executor driver) (1/1)
18/12/07 22:32:50 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 12.0, whose tasks have all completed, from pool 
18/12/07 22:32:50 INFO scheduler.DAGScheduler: ResultStage 12 (print at KafkaStreamingApp.scala:23) finished in 0.006 s
18/12/07 22:32:50 INFO scheduler.DAGScheduler: Job 6 finished: print at KafkaStreamingApp.scala:23, took 0.014111 s
-------------------------------------------
Time: 1544239970000 ms
-------------------------------------------
20

至此,一切打通,測試成功。

三、參考

  1. Spark Streaming實時流處理專案實戰
  2. flume、kafka、spark官方文件