SparkStreaming通過Flume獲取資料(單機,push和poll兩種方式)的實現
Flume是Cloudera提供的一個高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統,Flume支援在日誌系統中定製各類資料傳送方,用於收集資料;同時,Flume提供對資料進行簡單處理,並寫到各種資料接受方(可定製)的能力。
1、第一種方式,通過push的方式讀取資料。
首先在一臺虛擬機器上安裝flume1.8.0,vi /etc/profile,新增配置資訊
# flume-1.8.0 config
export FLUME_HOME=/home/hadoop/app/flume
export PATH=$FLUME_HOME/bin:$PATH
進入flume的conf資料夾 cp flume-env.sh.template flume-env.sh,新增配置
export JAVA_HOME=/home/hadoop/app/jdk
新建檔案flume-push.conf,新增配置檔案資訊:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/hadoop/data/flume a1.sources.r1.fileHeader = true # Describe the sink a1.sinks.k1.type = avro #這是接收方 a1.sinks.k1.hostname = 192.168.119.1 a1.sinks.k1.port = 8888 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
編寫spark程式:
package cn.allengao.stream import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * class_name: * package: * describe: Push方式讀取資料 * creat_user: Allen Gao * creat_date: 2018/2/12 * creat_time: 11:38 **/ object FlumePushWordCount { def main(args: Array[String]) { // val host = args(0) // val port = args(1).toInt LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("FlumeWordCount").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) //推送方式: flume向spark傳送資料 // val flumeStream = FlumeUtils.createStream(ssc, host, port) val flumeStream = FlumeUtils.createStream(ssc, "192.168.119.1", 8888) //flume中的資料通過event.getBody()才能拿到真正的內容 val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1)) val results = words.reduceByKey(_ + _) results.print() ssc.start() ssc.awaitTermination() } }
進入flume,執行bin/flume-ng agent -n a1 -c conf -f conf/flume-push.conf ,啟動flume程式。執行spark程式。
進入flume採集資料的資料夾,新建aaa.log檔案,新增內容:
hello tom hello jerry ,如果連線成功,可以在idea中看到如下執行結果:
-------------------------------------------
Time: 1518400235000 ms
-------------------------------------------
(tom,1)
(hello,2)
(jerry,1)
2、第二種方式,通過poll的方式讀取資料。
進入flume的conf資料夾,新建檔案flume-push.conf,新增配置檔案資訊:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/data/flume
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 192.168.119.51
a1.sinks.k1.port = 8888
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
編寫spark程式:
package cn.allengao.stream
import java.net.InetSocketAddress
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* class_name:
* package:
* describe: poll的方式讀取資料,使用較多。
* creat_user: Allen Gao
* creat_date: 2018/2/12
* creat_time: 11:41
**/
object FlumePollWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
//從flume中拉取資料(flume的地址),可以設定多個flume資料地址。
val address = Seq(new InetSocketAddress("192.168.119.51", 8888))
val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK)
val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1))
val results = words.reduceByKey(_+_)
results.print()
ssc.start()
ssc.awaitTermination()
}
}
進入flume,執行bin/flume-ng agent -n a1 -c conf -f conf/flume-poll.conf ,啟動flume程式。執行spark程式。
將一個檔案匯入flume採集資料的資料夾,例如 cp /home/hadoop/files/wc/word1.log /home/hadoop/data/flume/a.txt。
如果連線成功,可以在idea中看到如下執行結果:
-------------------------------------------Time: 1518405530000 ms
-------------------------------------------
(scala,1)
(tom,2)
(hello,5)
(java,1)
(jerry,1)
這裡要注意的是有可能發生flume正常執行,資料檔案採集正常,spark程式執行正常,但是在idea執行結果中看不到資料,有可能是你的資料採集資料夾的許可權設定不正確。我的許可權設定是這樣的:
drwxrwxr-x. 3 hadoop hadoop 163 2月 12 11:05 flume
請根據自己的機器配置進行修改。