1. 程式人生 > >Spark Streaming消費Kafka的資料進行統計

Spark Streaming消費Kafka的資料進行統計

流處理平臺:
在這裡插入圖片描述
這裡是第四步的實現:
Spark Streaming整合Kafka採用的是Receiver-based,另一種方式Direct Approach,稍作修改就行。

package spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming對接Kafka
  */
object KafkaStreamingApp {
def main(args: Array[String]): Unit = { if(args.length != 4) { System.err.println("Usage: KafkaStreamingApp <zkQuorum> <group> <topics> <numThreads>") } val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName
("KafkaReceiverWordCount") .setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(5)) val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap // Spark Streaming如何對接Kafka val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap) messages.
map(_._2).count().print() ssc.start() ssc.awaitTermination() } }