1. 程式人生 > >如何管理Spark Streaming消費Kafka的偏移量(三)

如何管理Spark Streaming消費Kafka的偏移量(三)

前面的文章已經介紹了在spark streaming整合kafka時,如何處理其偏移量的問題,由於spark streaming自帶的checkpoint弊端非常明顯,所以一些對資料一致性要求比較高的專案裡面,不建議採用其自帶的checkpoint來做故障恢復。

在spark streaming1.3之後的版本支援direct kafka stream,這種策略更加完善,放棄了原來使用Kafka的高階API自動儲存資料的偏移量,之後的版本採用Simple API也就是更加偏底層的api,我們既可以用checkpoint來容災,也可以通過低階api來獲取偏移量自己管理偏移量,這樣以來無論是程序升級,還是故障重啟,在框架端都可以做到Exact One準確一次的語義。

本篇文章,會再介紹下,如何手動管理kafka的offset,並給出具體的程式碼加以分析:

版本:

apache spark streaming2.1

apache kafka 0.9.0.0

手動管理offset的注意點:

(1)第一次專案啟動的時候,因為zk裡面沒有偏移量,所以使用KafkaUtils直接建立InputStream,預設是從最新的偏移量開始消費,這一點可以控制。

(2)如果非第一次啟動,zk裡面已經存在偏移量,所以我們讀取zk的偏移量,並把它傳入到KafkaUtils中,從上次結束時的偏移量開始消費處理。

(3)在foreachRDD裡面,對每一個批次的資料處理之後,再次更新存在zk裡面的偏移量

注意上面的3個步驟,1和2只會載入一次,第3個步驟是每個批次裡面都會執行一次。

下面看第一和第二個步驟的核心程式碼:

/****
    *
    * @param ssc  StreamingContext
    * @param kafkaParams  配置kafka的引數
    * @param zkClient  zk連線的client
    * @param zkOffsetPath zk裡面偏移量的路徑
    * @param topics     需要處理的topic
    * @return   InputDStream[(String, String)] 返回輸入流
*/
def createKafkaStream(ssc: StreamingContext, kafkaParams: Map[String, String], zkClient: ZkClient, zkOffsetPath: String, topics: Set[String]): InputDStream[(String, String)]={ //目前僅支援一個topic的偏移量處理,讀取zk裡面偏移量字串 val zkOffsetData=KafkaOffsetManager.readOffsets(zkClient,zkOffsetPath,topics.last) val kafkaStream = zkOffsetData match { case None => //如果從zk裡面沒有讀到偏移量,就說明是系統第一次啟動 log.info("系統第一次啟動,沒有讀取到偏移量,預設就最新的offset開始消費") //使用最新的偏移量建立DirectStream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) case Some(lastStopOffset) => log.info("從zk中讀取到偏移量,從上次的偏移量開始消費資料......") val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) //使用上次停止時候的偏移量建立DirectStream KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, lastStopOffset, messageHandler) } kafkaStream//返回建立的kafkaStream }

主要是針對第一次啟動,和非首次啟動做了不同的處理。

然後看下第三個步驟的程式碼:

/****
    * 儲存每個批次的rdd的offset到zk中
    * @param zkClient zk連線的client
    * @param zkOffsetPath   偏移量路徑
    * @param rdd     每個批次的rdd
    */
  def saveOffsets(zkClient: ZkClient, zkOffsetPath: String, rdd: RDD[_]): Unit = {
    //轉換rdd為Array[OffsetRange]
    val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //轉換每個OffsetRange為儲存到zk時的字串格式 :  分割槽序號1:偏移量1,分割槽序號2:偏移量2,......
    val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",")
    log.debug(" 儲存的偏移量:  "+offsetsRangesStr)
    //將最終的字串結果儲存到zk裡面
    ZkUtils.updatePersistentPath(zkClient, zkOffsetPath, offsetsRangesStr)
  }

主要是更新每個批次的偏移量到zk中。

例子已經上傳到github中,有興趣的同學可以參考這個連結:

後續文章會聊一下為了升級應用如何優雅的關閉的流程式,以及在kafka擴充套件分割槽時,上面的程式如何自動相容。

相關推薦

如何管理Spark Streaming消費Kafka偏移

上篇文章,討論了在spark streaming中管理消費kafka的偏移量的方式,本篇就接著聊聊上次說升級失敗的案例。 事情發生一個月前,由於當時我們想提高spark streaming程式的並行處理效能,於是需要增加kafka分割槽個數,,這裡需要說下,在新版本sp

如何管理Spark Streaming消費Kafka偏移

前面的文章已經介紹了在spark streaming整合kafka時,如何處理其偏移量的問題,由於spark streaming自帶的checkpoint弊端非常明顯,所以一些對資料一致性要求比較高的專案裡面,不建議採用其自帶的checkpoint來做故障恢復。 在sp

Spark StreamingKafka 偏移管理

本文主要介紹 Spark Streaming 應用開發中消費 Kafka 訊息的相關內容,文章著重突出了開發環境的配置以及手動管理 Kafka 偏移量的實現。 一、開發環境 1、元件版本 CDH 叢集版本:6.0.1 Spark 版本:2.2.0 Kafka 版本:1.0.1 2、M

Kafka:ZK+Kafka+Spark Streaming集群環境搭建安裝spark2.2.1

node word clas 執行 選擇 dir clust 用戶名 uil 如何配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如

kafka同步非同步消費和訊息的偏移

1. 消費者位置(consumer position) 因為kafka服務端不儲存訊息的狀態,所以消費端需要自己去做很多事情。我們每次呼叫poll()方法他總是返回已經儲存在生產者佇列中還未被消費者消費的訊息。訊息在每一個分割槽中都是順序的,那麼必然可以通過一

Kafka:ZK+Kafka+Spark Streaming集群環境搭建安裝kafka_2.11-1.1.0

itl CA blog tor line cat pre PE atan 如何搭建配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如

Kafka:ZK+Kafka+Spark Streaming集群環境搭建VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。

centos 失敗 sco pan html top n 而且 div href Centos7出現異常:Failed to start LSB: Bring up/down networking. 按照《Kafka:ZK+Kafka+Spark Streaming集群環

Kafka:ZK+Kafka+Spark Streaming集群環境搭建十三定義一個avro schema使用comsumer發送avro字符流,producer接受avro字符流並解析

finall ges records ring ack i++ 一個 lan cde 參考《在Kafka中使用Avro編碼消息:Consumer篇》、《在Kafka中使用Avro編碼消息:Producter篇》 pom.xml <depende

Spark Streaming實時流處理筆記6—— Kafka 和 Flume的整合

1 整體架構 2 Flume 配置 https://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html 啟動kafka kafka-server-start.sh $KAFKA_HOME/config/se

Spark Streaming實時流處理筆記5—— Kafka API 程式設計

1 新建 Maven工程 pom檔案 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLo

Spark Streaming實時流處理筆記4—— 分散式訊息佇列Kafka

1 Kafka概述 和訊息系統類似 1.1 訊息中介軟體 生產者和消費者 1.2 Kafka 架構和概念 producer:生產者(生產饅頭) consumer:消費者(吃饅頭) broker:籃子 topic : 主題,給饅頭帶一個標籤,(

Spark Streaming消費Kafka的資料進行統計

流處理平臺: 這裡是第四步的實現: Spark Streaming整合Kafka採用的是Receiver-based,另一種方式Direct Approach,稍作修改就行。 package spark import org.apache.spark.SparkConf impo

Spark Streaming消費Kafka Direct方式資料零丟失實現

一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、

Spark Streamingkafka 整合指導kafka 0.8.2.1 或以上版本

本節介紹一下如何配置Spark Streaming 來接收kafka的資料。有兩個方法: 1、老的方法 -使用Receivers 和kafka的高階API 2、新的方法( Spark 1.3 開始引入)-不適用Receivers。這兩個方式擁有不同的程式設計模型,效能特徵

Kafka筆記整理消費形式驗證與性能測試

大數據 Kafka 性能測試 [TOC] Kafka筆記整理(三):消費形式驗證與性能測試 Kafka消費形式驗證 前面的《Kafka筆記整理(一)》中有提到消費者的消費形式,說明如下: 1、每個consumer屬於一個consumer group,可以指定組id。group.id 2、消費形

Apache 流框架 Flink,Spark Streaming,Storm對比分析2

此文已由作者嶽猛授權網易雲社群釋出。 歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。 2.Spark Streaming架構及特性分析 2.1 基本架構 基於是spark core的spark streaming架構。 Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處

Spark Streaming實時流處理筆記3——日誌採集Flume

1 Flume介紹 1.1 設計目標 可靠性 擴充套件性 管理性 1.2 同類產品 Flume: Cloudera/Apache,Java Scribe: Facebook ,C/C++(不維護了) Chukwa: Yahoo

Spark Streaming實時流處理筆記2—— 實時處理介紹

1 實時和離線計算對比 1.1 資料來源 離線:HDFS 歷史資料,資料量較大 實時:訊息佇列(Kafka) 1.2 處理過程 離線:Mapreduce 實時:Spark(DStream/SS) 1.3 處理速度 離

Spark Streaming實時流處理筆記1——Spark-2.2.0原始碼編譯

1 下載原始碼 https://spark.apache.org/downloads.html 解壓 2 編譯原始碼 參考 https://www.imooc.com/article/18419 https://spark.apache.org/docs/2.2.2/bu

Apache 流框架 Flink,Spark Streaming,Storm對比分析

本文由 網易雲 釋出2.Spark Streaming架構及特性分析2.1 基本架構基於是spark core的spark streaming架構。Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark,也就是把Spark Str