1. 程式人生 > >Spark Streaming整合Kafka(一)

Spark Streaming整合Kafka(一)

基於Receiver 方式整合

一、Kafka版本選擇

Spark Streaming支援Kafka0.8.2.1及以上的版本。

Kafka專案介紹了兩個新的Comsumer(消費者)API,在0.8版本和0.10版本之間,根據自身需求選擇版本號,另外要注意,0.8版本是相容0.9 0.10版本的broker,但0.10版本不相容之前的版本,接下來我貼上下官網的一張對比圖:


筆者選的是0.8版本。

二、基於Receiver 方式整合

這種方式是使用Receiver接收資料,這個receiver是使用kafka高階的消費者API操作,所謂高階,就是它的偏移量之類的是由zookeeper來完成的,低層次的就需要手動來管理。所有的資料都是從kafka裡過來通過receiver進行接收,然後儲存到Spark executor裡面去,接著Spark Streaming啟動job處理儲存中的資料。

預設的方式在處理一些故障的時候,會丟失一些資料,為了確保0資料的丟失,需要在Spark Streaming裡面開啟WAL機制(Write Ahead Logs),這樣就能在HDFS上面同步儲存所有kafka裡面的資料,即先寫到日誌裡面去,再進行處理,如果出現故障,可以從日誌裡找回,就能避免資料的丟失。接下來,分析如何使用這種機制在stream應用裡:

1.匯入依賴

 groupId = org.apache.spark
 artifactId = spark-streaming-kafka-0-8_2.11
 version = 2.2.0

2.程式設計

我們需要匯入KafkaUrils,生成一個輸入DStream

 import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
此處有幾點要注意下:

1)kafka的partition和RDD裡的partition不是一個概念;

2)有多個kafka DStream,我們可以採用不同的組到topic上面,採用並行的方式進行接收,這樣可以提升資料的吞吐量;

3)如果需要開啟WAL機制的話,底層需要有個支援副本的,類似HDFS的檔案系統,資料接收後,首先會以副本的方式儲存到日誌裡面。對於 input stream 的storage level 需要設定成 StorageLevel.MEMORY_AND_DISK_SER ,大概意思也就是在記憶體和磁碟都儲存一下。

Demo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming對接Kafka的方式一
  */
object KafkaReceiverWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 4) {
      System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
    }

    val Array(zkQuorum, group, topics, numThreads) = args

    val sparkConf = new SparkConf() //.setAppName("KafkaReceiverWordCount")
      //.setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

    // TODO... Spark Streaming如何對接Kafka
    val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)

    // TODO... 自己去測試為什麼要取第二個
    messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3.提交或部署

首先進行打包,命令為:mvn clean package -DskipTests ,使用Spark-submit,提交命令如下:

spark-submit \
--class com.imooc.spark.KafkaReceiverWordCount \
--master local[2] \
--name KafkaReceiverWordCount \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \
/home/hadoop/lib/sparktrain-1.0.jar  hadoop000:2181 test kafka_streaming_topic 1

注意的是,Spark Streaming會一直啟動receiver來接收資料,如果結束掉這個job,就無法正常接收kafka的資料。本次測試的話,可以先跑起kafka後,再在Spark上執行一個任務,即啟動個receiver來接收kafka的資料。

三、Receiver整合總結

1) 啟動zk
2) 啟動kafka
3) 建立topic
4) 通過控制檯測試本topic是否能夠正常的生產和消費資訊