Spark Streaming消費Kafka的資料進行統計
阿新 • • 發佈:2018-12-20
流處理平臺:
這裡是第四步的實現:
Spark Streaming整合Kafka採用的是Receiver-based,另一種方式Direct Approach,稍作修改就行。
package spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Spark Streaming對接Kafka
*/
object KafkaStreamingApp {
def main(args: Array[String]): Unit = {
if(args.length != 4) {
System.err.println("Usage: KafkaStreamingApp <zkQuorum> <group> <topics> <numThreads>")
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName ("KafkaReceiverWordCount")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Spark Streaming如何對接Kafka
val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)
messages. map(_._2).count().print()
ssc.start()
ssc.awaitTermination()
}
}