1. 程式人生 > >大資料學習之路96-SparkStreaming整合Kafka

大資料學習之路96-SparkStreaming整合Kafka

我們前面SparkStreaming獲取資料的來源是TCP,但是平常是不會這麼用的,我們通常用的是Kafka。

SparkStreamingContext是不直接提供對Kafka的訪問的。

這個時候就有KafkaUtils

這裡有兩個方法

1.createDirectStream,是一種直連方式,他很重要,因為他用的是Kafka的底層API,他在消費的時候,會直接連到Kafka的分割槽上。

2.createStream,是有接收者的方式,而剛才我們講的createDirectStream是沒有接收者的方式。這是一種簡單的方式,這種簡單的方式在很早之前就出現了,但是他有一個問題,就是容易丟失資料,並且它的效率也比較低。

我們這裡先使用這種簡單的方式,這種簡單方式其實他會自動幫我們維護偏移量,如果我們想要使用又高效,又不丟失資料的方式,我們就要手動維護偏移量。

使用這種傻瓜式的方式,我們首先要傳進去StreamingContext,

第二個引數是zkQuorm,這個引數是zookeeper的連線地址

第三個引數是組id,我們知道消費者有一個消費者組,而我們SparkStreaming程式提交到叢集中是分散式執行的。他相當於有很多的消費者。很多的消費者要消費同一份資料。他們不能重複消費,有了消費者組之後,就不會產生交叉消費的情況。

第四個引數是topic,他是一個Map,Map的key是主題名,value是分割槽數量

第五個引數是儲存級別

配置完之後我們就可以從Kafka中獲取資料,我們可以看到他的返回值為DStream型別的鍵值對,key為主題名稱,value為值:

完整程式碼如下:

package com.test.sparkStreaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaStreamingWordCount {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val conf: SparkConf = new SparkConf().setAppName("KafkaStreamingWordCount").setMaster("local[2]")
    //建立一個SparkStreamingContext
    val ssc: StreamingContext = new StreamingContext(conf,Seconds(5))
    //如果想要更新歷史狀態(累加),要設定checkpoint
    ssc.checkpoint("./ck")
    //從Kafka中拉取資料
    val zkQuorum = "marshal:2181,marshal01:2181,marshal02:2181,marshal03:2181,marshal04:2181,marshal05:2181"
    val groupId = "g1"
    val topic = Map("wordcount" -> 2)
    val lines: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupId,topic ,StorageLevel.MEMORY_ONLY)
    //獲取Kafka中的每一行內容
    val line: DStream[String] = lines.map(_._2)
    val result: DStream[(String, Int)] = line.flatMap(_.split(" ").map((_,1))).reduceByKey(_+_)
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

在執行之前首先我們檢查一下我們有沒有建立過wordcount的topic

 

接下來我們建立一個topic為wordcount

bin/kafka-topics.sh --create --zookeeper marshal:2181,marshal01:2181,marshal02:2181,marshal03:2181,
marshal04:2181,marshal05:2181 --replication-factor 3 --partitions 3--topic wordcount

我們可以看到這裡有個分割槽,那麼這裡的分割槽和之前RDD的分割槽和mapreduce的分割槽是不一樣的。

Kafka的分割槽就意味著在一臺機器上分3個地方進行儲存。一個分割槽會在其他機器上儲存幾份副本。

如果是hdfs中的資料會被切分成多個block塊,而kafka中的資料會被存成多個分割槽。

建立完主題之後我們啟動一個生產者,再執行程式觀察效果: