Kafka+Spark Streaming實現單詞數量的實時統計
1. 準備工作
Kafka叢集的搭建可以參考Kafka叢集搭建與配置
Spark叢集的搭建可以參考ofollow,noindex">Hadoop+HBase+Spark+Hive環境搭建
2. 編寫程式碼(scala實現)
引入pom依賴
<properties> <kafka.version>2.0.0</kafka.version> <spark.version>2.3.1</spark.version> <scala.version>2.11</scala.version> </properties> <dependencies> <!--spark--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <!-- log4j --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.scalanlp</groupId> <artifactId>breeze-viz_2.11</artifactId> <version>0.13.2</version> </dependency> </dependencies>
生產者,每秒傳送資料"yang yun ni hao sha a"
package com.whut.demo import java.util import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} object KafkaProducer { def main(args: Array[String]) { //設定代理節點和主題 val brokers = "192.168.1.41:9092,192.168.1.42:9092,192.168.1.47:9092" //zookeeper代理節點 val inputTopic = "input-streaming-topic" //topic //設定zookeeper連線屬性 val props = new util.HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") //初始化生產者 val producer = new KafkaProducer[String, String](props) //傳送訊息 while(true) { val key = null val value = "yang yun ni hao sha a" val message = new ProducerRecord[String,String](inputTopic, key, value) producer.send(message) println(message) Thread.sleep(1000) } } }
Spark Streaming作消費者,每2秒統計一次最近10秒每個單詞出現的次數
package com.whut.demo import org.apache.spark.streaming._ import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe object SparkConsumer{ def main(args:Array[String]){ /** * 設定spark master * 單機模式: "local[*]" * 叢集模式: "spark://192.168.1.32:7077" */ val master = "local[*]" /** * 設定checkpoint路徑 * 單機模式: "checkpoint" * 叢集模式: "hdfs://master:9000/user/checkpoint" */ val checkpoint = "checkpoint" //設定日誌等級 LogConfig.setStreamingLogLevels() //設定批處理間隔,單位秒 val batchDuration = 1 //設定輸入流的topic val inputTopic = "input-streaming-topic" //設定輸出流的topic val outputTopic = "output-streaming-topic" //初始化streamingContext val streamingContext = new StreamingContext( new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster(master), Seconds(batchDuration) ) streamingContext.checkpoint(checkpoint) //kafka配置 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "192.168.1.41:9092,192.168.1.42:9092,192.168.1.47:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "1", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) //建立DStream val dStream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](Array(inputTopic), kafkaParams) ) //對接收到的一個DStream進行解析 val lines = dStream.map(record => (record.key, record.value)) val words = lines.map(_._2) val word = words.flatMap(_.split(" ")) val pair = word.map(x => (x,1)) //視窗長度設定為10秒,視窗滑動距離設定為2秒 val wordCounts = pair.reduceByKeyAndWindow(_ + _, _ - _, Seconds(10), Seconds(2)) wordCounts.print streamingContext.start streamingContext.awaitTermination } }
為了使得輸出更加簡潔,我們還要設定一下日誌等級
package com.whut.demo import org.apache.spark.internal.Logging import org.apache.log4j.{Level, Logger} object LogConfig extends Logging { def setStreamingLogLevels() { val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.") Logger.getRootLogger.setLevel(Level.WARN)//警告以上級別才打印 } } }
3. 叢集模式下流處理任務的提交
單機模式
直接在IDEA中執行兩個程式即可
叢集模式
KafkaProducer 可以直接在IDEA中執行,但SparkConsumer需要將打成jar包,然後用spark-submit提交任務
Idea下,將程式打成jar包(參考IDEA 打jar,提交spark叢集執行 )
將Jar包上傳到HDFS(下面是可能會用到的hdfs命令)
hdfs dfs -ls / hdfs dfs -rm /SparkConsumer.jar hdfs dfs -put SparkConsumer.jar /
spark提交任務
spark-submit --class com.whut.demo.SparkConsumer --master spark://master:7077 hdfs://master:9000/SparkConsumer.jar