SparkStreaming(10):高階資料來源flume-push方式(生產)
阿新 • • 發佈:2018-11-08
【參考:http://spark.apache.org/docs/2.1.0/streaming-flume-integration.html】
1.環境
spark2.1.0
flume1.6.0
2.flume的配置檔案flume_push_streaming.conf
(1)flume作用是將伺服器資料,傳遞到本地windows環境的埠
(2)IP:192.168.57.1是本地windows的IP
simple-agent.sources = netcat-source simple-agent.sinks = avro-sink simple-agent.channels = memory-channel simple-agent.sources.netcat-source.type = netcat simple-agent.sources.netcat-source.bind = bigdata.ibeifeng.com simple-agent.sources.netcat-source.port = 44444 simple-agent.sinks.avro-sink.type = avro simple-agent.sinks.avro-sink.hostname = 192.168.57.1 simple-agent.sinks.avro-sink.port = 41414 simple-agent.channels.memory-channel.type = memory simple-agent.sources.netcat-source.channels = memory-channel simple-agent.sinks.avro-sink.channel = memory-channel
3.scala程式碼
(1)依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
(2)程式碼
package _0918MukeSpark import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * sparkstreaming 整合flume的第一種方式 */ object FlumePushWordCount_product { def main(args: Array[String]): Unit = { //實際生產使用 if(args.length!=2){ System.err.println("Usage:FlumePushWordCount_product <hostname><port>") System.exit(1) } val Array(hostname,port)=args var sparkConf=new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount") val ssc=new StreamingContext(sparkConf,Seconds(5)) //TODO:如何使用Sparkfluming 整合flume // val flumeStream= FlumeUtils.createStream(ssc,"0.0.0.0",41414) val flumeStream= FlumeUtils.createStream(ssc,hostname,port.toInt) flumeStream.map(x=>new String(x.event.getBody.array()).trim) .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
4.測試
(1)啟動FlumePushWordCount程式碼
執行左邊的下三角-》Edit Configurations-》Program arguments,填寫:0.0.0.0 41414
(2)啟動flume
bin/flume-ng agent \
--name simple-agent \
--conf conf \
--conf-file conf/flume_push_streaming.conf \
-Dflume.root.logger=INFO,console
(3)telnet輸入資料
[[email protected] /]# telnet bigdata.ibeifeng.com 44444
Trying 192.168.31.3...
Connected to bigdata.ibeifeng.com.
Escape character is '^]'.
fe
OK
sef