1. 程式人生 > >SparkStreaming(10):高階資料來源flume-push方式(生產)

SparkStreaming(10):高階資料來源flume-push方式(生產)

【參考:http://spark.apache.org/docs/2.1.0/streaming-flume-integration.html

1.環境

spark2.1.0

flume1.6.0

 

2.flume的配置檔案flume_push_streaming.conf

(1)flume作用是將伺服器資料,傳遞到本地windows環境的埠

(2)IP:192.168.57.1是本地windows的IP

simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = bigdata.ibeifeng.com
simple-agent.sources.netcat-source.port = 44444

simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname = 192.168.57.1
simple-agent.sinks.avro-sink.port = 41414

simple-agent.channels.memory-channel.type = memory

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel = memory-channel

3.scala程式碼

(1)依賴

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

(2)程式碼

package _0918MukeSpark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * sparkstreaming 整合flume的第一種方式
  */
object FlumePushWordCount_product {
  def main(args: Array[String]): Unit = {

    //實際生產使用
    if(args.length!=2){
      System.err.println("Usage:FlumePushWordCount_product <hostname><port>")
      System.exit(1)
    }

    val Array(hostname,port)=args

    var sparkConf=new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc=new StreamingContext(sparkConf,Seconds(5))

    //TODO:如何使用Sparkfluming 整合flume
//    val flumeStream= FlumeUtils.createStream(ssc,"0.0.0.0",41414)

    val flumeStream= FlumeUtils.createStream(ssc,hostname,port.toInt)

    flumeStream.map(x=>new String(x.event.getBody.array()).trim)
      .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()

  }

}

4.測試

(1)啟動FlumePushWordCount程式碼

           執行左邊的下三角-》Edit Configurations-》Program arguments,填寫:0.0.0.0 41414

(2)啟動flume

bin/flume-ng agent  \
--name simple-agent   \
--conf conf    \
--conf-file conf/flume_push_streaming.conf  \
-Dflume.root.logger=INFO,console

(3)telnet輸入資料

	[[email protected] /]# telnet bigdata.ibeifeng.com 44444
	Trying 192.168.31.3...
	Connected to bigdata.ibeifeng.com.
	Escape character is '^]'.
	fe
	OK
	sef

(經測試,成功!)