1. 程式人生 > >Spark Streaming 2.3.2整合Flume

Spark Streaming 2.3.2整合Flume

導讀:在Spark Streaming整合Flume文件中,官方提供兩種方式,push和pull

Flume是Spark Streaming的高階資料來源之一直達車
Spark Streaming整合Flume官方文件直達車
如果你對Flume不熟悉,這裡是我記錄的Flume的基本教程直達車,歡迎到訪
該文例項程式碼我的碼雲直達車

一、概述

Apache Flume是一種分散式,可靠且可用的服務,用於高效收集,聚合和移動大量日誌資料。在這裡,我們將解釋如何配置Flume和Spark Streaming以從Flume接收資料。這有兩種方法。

注意:從Spark 2.3.0開始,不推薦使用Flume支援。

二、Flume-style Push-based Approach

直達車
Flume旨在推動Flume agent之間的資料。在這種方法中,Spark Streaming基本上設定了一個接收器,它作為Flume的Avro代理,Flume可以將資料推送過來。

由於是推送模型Spark Streaming應用程式需要先啟動,接收器在所選埠上進行排程和監聽,以便Flume能夠推送資料。

本地開發測試

1)conf配置,直達車,這裡我配置的使用者本地測試的,後面會提到如何跑在Spark上

cd $FLUME_HOME/conf 編輯 vim flume-push-streaming.conf

flume-push-streaming.sources  =  netcat-source
flume-push-streaming.sinks  =  avro-sink
flume-push-streaming.channels  =  memory-channel

flume-push-streaming.sources.netcat-source.type  =  netcat
flume-push-streaming.sources.netcat-source.bind = hadoop000
flume-push-streaming.sources.netcat-source.port = 44444

flume-push-streaming.sinks.avro-sink.type  =  avro
flume-push-streaming.sinks.avro-sink.hostname  =  192.168.31.31
flume-push-streaming.sinks.avro-sink.port  =  44445

flume-push-streaming.channels.memory-channel.type  =  memory

flume-push-streaming.sources.netcat-source.channels  =  memory-channel
flume-push-streaming.sinks.avro-sink.channel  =  memory-channel

注意:hadoop000是我linux系統的hostname,192.168.31.31是我windows的ip

2)程式碼,直達車

object FlumePushWordCountTest {

  def main(args: Array[String]): Unit = {

    if (args.length != 2) {
      System.err.println("Usage: FlumePushWordCountTest <hostname> <port>")
      System.exit(1)
    }

    val Array(hostname, port) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCountTest")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)

    flumeStream.map(x => new String(x.event.getBody.array()).trim)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3)本地執行Spark Streaming,執行引數為:0.0.0.044445,因為上面的conf中配置了sink到我的windows的44445埠
4)之後linux伺服器執行Flume,如果你對Flume不熟悉,這裡是我記錄的Flume的基本教程直達車,歡迎到訪

flume-ng agent \
--name flume-push-streaming \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume-push-streaming.conf \
-Dflume.root.logger=INFO,console

5)啟動 telnet hadoop000 44444 輸入資料,檢視結果
在這裡插入圖片描述

伺服器環境

1)部署到伺服器環境執行(先執行jar包,再執行flume,和上面的本地操作流程一樣)

前提:上面是本地測試的,sink到的是我windows的ip。所以上面的conf檔案中sink得改一下

flume-push-streaming.sinks.avro-sink.hostname  =  hadoop000

2)打包,mvn clean package -DskipTests

3)上傳jar包到伺服器

4)執行jar包命令

./bin/spark-submit \
--class com.imooc.spark.streaming.flume.FlumePushWordCountTest \
--name FlumePushWordCountTest \
--master local[2] \
--packages org.apache.spark:spark-streaming-flume_2.11:2.3.2 \
/root/lib/spark-sql-1.0-jar-with-dependencies.jar \
hadoop000 44445

由於上面打包依賴沒有打進去,這裡指定一下(–packages)就可以了,執行的時候會自動幫你下載依賴,注意聯網。注意jar包路徑
提示:使用 maven-assembly-plugin 外掛可以把自己想要的包打進去。

三、Pull-based Approach using a Custom Sink

直達車
和第一種不一樣,Flume不將資料直接推送到Spark Streaming。

而是

Flume將資料推入接收器,資料保持緩衝 ,Spark Streaming使用可靠的Flume接收器 和事務從接收器中提取資料。只有在Spark Streaming接收和複製資料後,事務才會成功 。(我們自己取拿資料處理)

和第一種push相比,這一種具有更強的可靠性和容錯性

1)conf配置,直達車,這裡我配置的使用者本地測試的,如何跑在Spark上和第一種方式一模一樣

cd $FLUME_HOME/conf vim flume-pull-streaming.conf

flume-pull-streaming.sources  =  netcat-source
flume-pull-streaming.sinks  =  spark-sink
flume-pull-streaming.channels  =  memory-channel

flume-pull-streaming.sources.netcat-source.type  =  netcat
flume-pull-streaming.sources.netcat-source.bind = hadoop000
flume-pull-streaming.sources.netcat-source.port = 44444

flume-pull-streaming.sinks.spark-sink.type  =  org.apache.spark.streaming.flume.sink.SparkSink
flume-pull-streaming.sinks.spark-sink.hostname  =  hadoop000
flume-pull-streaming.sinks.spark-sink.port  =  44445

flume-pull-streaming.channels.memory-channel.type  =  memory

flume-pull-streaming.sources.netcat-source.channels  =  memory-channel
flume-pull-streaming.sinks.spark-sink.channel  =  memory-channel

注意:hadoop000是我linux的hostname

2)程式碼,直達車

object FlumePullWordCountTest {

  def main(args: Array[String]): Unit = {

    if (args.length != 2) {
      System.err.println("Usage: FlumePushWordCountTest <hostname> <port>")
      System.exit(1)
    }

    val Array(hostname, port) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCountTest")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt)

    flumeStream.map(x => new String(x.event.getBody.array()).trim)
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3)先啟動flume,後啟動spark streaming應用

4)執行flume

flume-ng agent \
--name flume-pull-streaming \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume-pull-streaming.conf \
-Dflume.root.logger=INFO,console

5)本地執行Spark Streaming,執行引數為 192.168.31.30 44445 ,分別是我linux的ip和埠

6)啟動 telnet hadoop000 44444 輸入資料,檢視結果
在這裡插入圖片描述