1. 程式人生 > >Spark學習筆記(15)——Spark Streaming 整合 Flume

Spark學習筆記(15)——Spark Streaming 整合 Flume

1 flume 配置檔案

在這裡插入圖片描述

flume-env.sh 裡配置 JAVA_HOME

1.1 flume-pull.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/data/flume
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = node1
a1.sinks.k1.port = 8888

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

1.2 flume-push.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/data/flume
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = avro
#這是接收方
a1.sinks.k1.hostname = 192.168.30.1
a1.sinks.k1.port = 8888

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2 push 案例

2.1 pom

 <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.10</artifactId>
            <version>1.6.3</version>
 </dependency>

2.2 原始碼

package streamingAndflume

import mystreaming.
LoggerLevels import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object FlumePushCount { def main(args: Array[String]): Unit = { LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("FlumePush").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(5)) //flume 向 spark 傳送資訊,此處IP地址是本機電腦IP val flumeStream = FlumeUtils.createStream(ssc,"192.168.30.1",8888) //flume 中的資料通過event.getBody()才能拿到真正的內容 val words = flumeStream.flatMap(x=>new String(x.event.getBody().array()).split(" ")).map((_,1)) val results = words.reduceByKey(_+_) results.print() ssc.start() ssc.awaitTermination() } }

2.3 啟動 flume

/export/data/flume 放一些 資料

[[email protected] apache-flume-1.6.0-bin]$ bin/flume-ng agent -n a1 -c conf -f conf/flume-push.conf

3 poll 方式

需要為在flume lib 下新增相應的 jar 包
參考連結https://spark.apache.org/docs/latest/streaming-flume-integration.html
在這裡插入圖片描述

3.1 poll原始碼

package streamingAndflume

import java.net.InetSocketAddress

import mystreaming.LoggerLevels
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FlumePollCount {
  def main(args: Array[String]): Unit = {
    LoggerLevels.setStreamingLogLevels()

    val conf = new SparkConf().setAppName("FlumePush").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))

    //從 flume 中拉取資料,flume的地址,這裡可以傳很多地址
    val address = Seq(new InetSocketAddress("node1",8888))

    val flumeStream = FlumeUtils.createPollingStream(ssc,address,StorageLevel.MEMORY_AND_DISK)
    val words = flumeStream.flatMap(x=>new String(x.event.getBody().array()).split(" ")).map((_,1))
    val results = words.reduceByKey(_+_)

    results.print()

    ssc.start()
    ssc.awaitTermination()

  }
}

3.2 先啟動flume,再啟動程式

[[email protected] apache-flume-1.6.0-bin]$ bin/flume-ng agent -n a1 -c conf -f conf/flume-pull.conf

4 pull 方式在叢集中執行

啟動 Spark 叢集[[email protected] apache-flume-1.6.0-bin]$ /home/hadoop/apps/spark-1.6.3-bin-hadoop2.6/sbin/start-all.sh

4.1 原始碼

原始碼用 maven 打包

package streamingAndflume

import mystreaming.LoggerLevels
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FlumePushCount {
  def main(args: Array[String]): Unit = {
    LoggerLevels.setStreamingLogLevels()

    val host = args(0)
    val port = args(1).toInt

    val conf = new SparkConf().setAppName("FlumePush")
    val ssc = new StreamingContext(conf,Seconds(5))

    //flume 向 spark 傳送資訊,此處IP地址是本機電腦IP
    val flumeStream = FlumeUtils.createStream(ssc,host,port)

    //flume 中的資料通過event.getBody()才能拿到真正的內容
    val words = flumeStream.flatMap(x=>new String(x.event.getBody().array()).split(" ")).map((_,1))

    val results = words.reduceByKey(_+_)

    results.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

4.2 啟動 jar

修改 flume-push.conf

a1.sinks.k1.hostname = node2
[[email protected] ~]$ /home/hadoop/apps/spark-1.6.3-bin-hadoop2.6/bin/spark-submit --master spark://node1:7077 --class streamingAndflume.FlumePushCount /home/hadoop/push.jar node2 8888

4.3 啟動 flume

[[email protected] apache-flume-1.6.0-bin]$ bin/flume-ng agent -n a1 -c conf -f conf/flume-pull.conf