1. 程式人生 > >SparkStreaming(11):高階資料來源flume-pull方式(生產)

SparkStreaming(11):高階資料來源flume-pull方式(生產)

1.環境

(1)生產環境

flume1.6.0

spark2.1.0

(2)下載對應依賴

備註:一定要將依賴都放入flume的Flume’s classpath內,否則flume執行有問題。(遇到過坑~~~)

(i) Custom sink JAR:

 groupId = org.apache.spark
 artifactId = spark-streaming-flume-sink_2.11
 version = 2.1.0

(ii) Scala library JAR:

 groupId = org.scala-lang
 artifactId = scala-library
 version = 2.11.7

(iii) Commons Lang 3 JAR: 

 groupId = org.apache.commons
 artifactId = commons-lang3
 version = 3.5

 

2.fluem的配置檔案flume_pull_streaming.conf

simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = bigdata.ibeifeng.com
simple-agent.sources.netcat-source.port = 44444

simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname =  bigdata.ibeifeng.com
simple-agent.sinks.spark-sink.port = 41414

simple-agent.channels.memory-channel.type = memory

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel

3.scala程式碼

package Spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * sparkstreaming 整合flume的第二種方式
  */
object FlumePullWordCount_product_server {
  def main(args: Array[String]): Unit = {

    //實際生產使用
    if(args.length!=2){
      System.err.println("Usage:FlumePullWordCount_product <hostname><port>")
      System.exit(1)
    }

    val Array(hostname,port)=args

    var sparkConf=new SparkConf() //.setMaster("local[2]").setAppName("FlumePullWordCount_product")
    val ssc=new StreamingContext(sparkConf,Seconds(5))

    //TODO:如何使用Sparkfluming 整合flume
    val flumeStream= FlumeUtils.createPollingStream(ssc,hostname,port.toInt)

    flumeStream.map(x=>new String(x.event.getBody.array()).trim)
      .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()

  }

}


4.測試

(1)將程式碼打包

(2)啟動flume

	bin/flume-ng agent  \
	--name simple-agent \
	--conf conf \
	--conf-file conf/flume_pull_streaming.conf \
	-Dflume.root.logger=INFO,console

(3)啟動telnet

telnet bigdata.ibeifeng.com 44444

(4)開啟hdfs(如不開啟,會報錯)

(5)提交spark任務

	bin/spark-submit \
	--class Spark.FlumePullWordCount_product_server \
	--master local[2] \
	--packages org.apache.spark:spark-streaming-flume_2.11:2.1.0 \
	/opt/datas/lib/scalaProjectMaven.jar \
	bigdata.ibeifeng.com 41414

(6)telnet測試輸入

	OK
	s d f s 
	OK
	sd  fd f
	OK

(結果,成功!)