spark streaming小實戰之kafka讀取與儲存
阿新 • • 發佈:2019-02-04
本次小實戰主要介紹一下spark streaming如何讀取kafka資料
涉及理論部分在這就不多說了,自己也剛入門
先說下需求
待處理日誌格式為
ouMrq2r_aU1mtKRTmQclGo1UzY,3251210381,2018/11/29 13:46,上海,上海,210.2.2.6,7038004
ouMrq2r_aU1mtKRTmQclGo1UzY,3251210381,2018/09/18 08:37,上海,上海,210.2.2.6,7038004
ouMrq2r_aU1mtKRTmQclGo1UzY,3251210381,2018/02/19 01:16,上海,上海,210.2.2.6,7038004
需要做的是統計每隔5分鐘內被訪問的數量
資料從kafka中讀出,通過spark streaming處理,然後再寫會kafka
接下來將從兩部分入手說明專案完成過程
一、模擬kafka流
二、spark streaming處理最後寫回kafka
第一部分
目的:模擬真實kafka流情況
思路:使用kafka的connect監聽source檔案,如果發生修改,寫入topic
先開啟zookeeper,kafka
接著進入kafka目錄下,使用如下命令開啟connect
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
監聽的檔案位置在 config/connect-file-source.propertie 中配置
開啟該檔案,內容如下
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/Users/huangxiao/test_streaming/xbk_log.txt#監聽的檔案位置
topic=kafka_for_example#topic名稱(需要與spark中的一致)
connect執行成功如下
第二部分
直接看spark streaming的程式吧,用scala寫的,寫了很多註釋,就不多說
import java.sql.Date import java.text.SimpleDateFormat import java.util.Properties import kafka.serializer.StringDecoder import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Duration, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils import com.alibaba.fastjson.JSON import kafka.producer.ProducerConfig import kafka.producer.Producer import kafka.producer.KeyedMessage import org.apache.spark.{SparkConf, SparkContext} //1.開啟zk,kafka。2.啟動kafka-connect(source部分)3.執行此檔案 object Kafka_Spark { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafka-spark-demo") val scc = new StreamingContext(sparkConf, Duration(5000))//new一個spark-streaming的上下文 // scc.checkpoint(".") // 暫時用不到 val topics = Set("kafka_spark2") //我們需要消費的kafka資料的topic val kafkaParam = Map( "metadata.broker.list" -> "localhost:9092", // kafka的broker list地址 "auto.offset.reset" -> "smallest"//這個引數可以讓streaming消費topic的時候從頭開始消費 ) val stream: InputDStream[(String, String)] = createStream(scc, kafkaParam, topics)//建立流,讀資料,傳入上下文,kafka配置,主題名字 val wordCount = stream.map(l => (json_an(l._2), 1)).reduceByKey(_ + _) //對資料進行處理 wordCount.print()//輸出到控制檯看看結果 //傳送資料(對外部伺服器連線必須要用這種方式,不然會報錯:任務無法序列化) wordCount.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => //配置說明 val producerProperties = new Properties() producerProperties.put("serializer.class", "kafka.serializer.StringEncoder") producerProperties.put("metadata.broker.list", "localhost:9092") producerProperties.put("request.required.acks", "1") val config: ProducerConfig = new ProducerConfig(producerProperties) //與kafka進行連線。此處用的是kafka自家的Producer,用spark的kafkaproducer也可以,但傳送的方式不同 val producer = new Producer[String,String](config) partitionOfRecords.foreach(record => //傳送資料,在這裡key簡單的用了相同的。實際情況應該用別的 producer.send(new KeyedMessage("cunchu","key",record.toString())) ) } } scc.start() // 真正啟動程式 scc.awaitTermination() //阻塞等待 } /** * 建立一個從kafka獲取資料的流. * * @param scc spark streaming上下文 * @param kafkaParam kafka相關配置 * @param topics 需要消費的topic集合 * @return */ def createStream(scc: StreamingContext, kafkaParam: Map[String, String], topics: Set[String]) = { KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](scc, kafkaParam, topics) } //處理時間 def formatData(line: String) = { val date = new SimpleDateFormat("yyyy/MM/dd H:mm") val d = new SimpleDateFormat("yyyy/MM/dd") val dateFormated = date.parse(line) val dateFormated3 = date.parse(line.split(" ")(0) + " 0:0") val dateFormated2 = date.format(dateFormated) val dateFormated4 = date.format(dateFormated3) val dateFf = date.parse(dateFormated2).getTime val dateFf2 = date.parse(dateFormated4).getTime val r = dateFf - dateFf2 val hash = r / 300000 val final_date = new Date(hash.toInt * 300000 + dateFf2) date.format(final_date) } //字串處理。在這裡是提取時間 def json_an(str: String) = { if (str.length < 10) { 1 } else { val json = JSON.parseObject(str) val main_v = json.get("payload") if (main_v.toString.split(",").length == 7) { formatData(main_v.toString.split(",")(2)) } else { "NAN" } } } }
執行即可
出現Time:xxx的時間戳時說明已經成功運行了
下面我們往檔案中加資料看看是否能讀到並處理
可以看到處理已經成功
那麼我們再檢查一下是否成功儲存了呢。程式碼中我寫回kafka的topic名字是cunchu
因此我們開啟一個消費者看看cunchu中是否有我們處理好的資料
在kafka目錄下執行如下命令
bin/kafka-console-consumer.sh -zookeeper localhost:2181 --from-beginning --topic cunchu
結果
大功告成!