1. 程式人生 > >SparkStreaming通過Flume獲取資料(單機,push和poll兩種方式)的實現

SparkStreaming通過Flume獲取資料(單機,push和poll兩種方式)的實現

    Flume是Cloudera提供的一個高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統,Flume支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力。

1、第一種方式,通過push的方式讀取資料。

首先在一臺虛擬機器上安裝flume1.8.0,vi /etc/profile,新增配置資訊

# flume-1.8.0 config
export FLUME_HOME=/home/hadoop/app/flume

export PATH=$FLUME_HOME/bin:$PATH

進入flume的conf資料夾 cp flume-env.sh.template flume-env.sh,新增配置

export JAVA_HOME=/home/hadoop/app/jdk

新建檔案flume-push.conf,新增配置檔案資訊:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/data/flume
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = avro
#這是接收方
a1.sinks.k1.hostname = 192.168.119.1
a1.sinks.k1.port = 8888

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

編寫spark程式:

package cn.allengao.stream

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * class_name: 
 * package: 
 * describe: Push方式讀取資料
 * creat_user: Allen Gao
 * creat_date: 2018/2/12
 * creat_time: 11:38
 **/
object FlumePushWordCount {

  def main(args: Array[String]) {
//    val host = args(0)
  //  val port = args(1).toInt
    LoggerLevels.setStreamingLogLevels()
    val conf = new SparkConf().setAppName("FlumeWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    //推送方式: flume向spark傳送資料
//    val flumeStream = FlumeUtils.createStream(ssc, host, port)
    val flumeStream = FlumeUtils.createStream(ssc, "192.168.119.1", 8888)
    //flume中的資料通過event.getBody()才能拿到真正的內容
    val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1))

    val results = words.reduceByKey(_ + _)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

進入flume,執行bin/flume-ng agent -n a1 -c conf -f conf/flume-push.conf ,啟動flume程式。執行spark程式。

進入flume採集資料的資料夾,新建aaa.log檔案,新增內容:

hello tom hello jerry ,如果連線成功,可以在idea中看到如下執行結果:

-------------------------------------------
Time: 1518400235000 ms
-------------------------------------------
(tom,1)
(hello,2)
(jerry,1)

2、第二種方式,通過poll的方式讀取資料。

進入flume的conf資料夾,新建檔案flume-push.conf,新增配置檔案資訊:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/data/flume
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 192.168.119.51
a1.sinks.k1.port = 8888

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

編寫spark程式:

package cn.allengao.stream

import java.net.InetSocketAddress

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * class_name: 
 * package: 
 * describe: poll的方式讀取資料,使用較多。
 * creat_user: Allen Gao
 * creat_date: 2018/2/12
 * creat_time: 11:41
 **/
object FlumePollWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    //從flume中拉取資料(flume的地址),可以設定多個flume資料地址。
    val address = Seq(new InetSocketAddress("192.168.119.51", 8888))
    val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK)
    val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1))
    val results = words.reduceByKey(_+_)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

進入flume,執行bin/flume-ng agent -n a1 -c conf -f conf/flume-poll.conf ,啟動flume程式。執行spark程式。

將一個檔案匯入flume採集資料的資料夾,例如 cp /home/hadoop/files/wc/word1.log /home/hadoop/data/flume/a.txt。

如果連線成功,可以在idea中看到如下執行結果:

-------------------------------------------
Time: 1518405530000 ms
-------------------------------------------
(scala,1)
(tom,2)
(hello,5)
(java,1)
(jerry,1)

    這裡要注意的是有可能發生flume正常執行,資料檔案採集正常,spark程式執行正常,但是在idea執行結果中看不到資料,有可能是你的資料採集資料夾的許可權設定不正確。我的許可權設定是這樣的:

drwxrwxr-x. 3 hadoop hadoop 163 2月  12 11:05 flume

請根據自己的機器配置進行修改。