1. 程式人生 > >如何管理Spark Streaming消費Kafka的偏移量(二)

如何管理Spark Streaming消費Kafka的偏移量(二)

上篇文章,討論了在spark streaming中管理消費kafka的偏移量的方式,本篇就接著聊聊上次說升級失敗的案例。

事情發生一個月前,由於當時我們想提高spark streaming程式的並行處理效能,於是需要增加kafka分割槽個數,,這裡需要說下,在新版本spark streaming和kafka的整合中,按照官網的建議 spark streaming的executors的數量要和kafka的partition的個數保持相等,這樣每一個executor處理一個kafka partition的資料,效率是最高的。如果executors的數量大於kafka的分割槽個數,其實多餘的executors相當於是不會處理任何資料,這部分的程序其實是白白浪費效能。

如果executor的個數小於kafka partition的個數,那麼其實有一些executors程序是需要處理多個partition分割槽的資料的,所以官網建議spark executors的程序數和kafka partition的個數要保持一致。

那麼問題來了,如果想要提高spark streaming的並行處理效能,只能增加kafka的分割槽了,給kafka增加分割槽比較容易,直接執行一個命令即可,不過這裡需要注意,kafka的分割槽只能增加不能減少,所以新增分割槽要考慮到底多少個才合適。

接下來我們便增加了kafka分割槽的數量,同時修改了spark streaming的executors的個數和kafka的分割槽個數一一對應,然後就啟動了流程式,結果出現了比較詭異的問題,表現如下:

造幾條測試資料打入kafka中,發現程式總是隻能處理其中的一部分資料,而每次總有一些資料丟失。按理說程式碼沒有任何改動,只是增加kafka的分割槽和spark streaming的executors的個數,應該不會出現問題才對,於是又重新測了原來的舊分割槽和程式,發現沒有問題,經過對比發現問題只會出現在kafka新增分割槽後,然後出現這種丟資料的情況。然後和運維同學一起看了新增的kafka的分割槽的磁碟目錄是否有資料落入,經查詢發現新的分割槽確實已經有資料進入了,這就很奇怪了丟的資料到底是怎麼丟的?

最後我又檢查了我們自己儲存的kafka的offset,發現裡面的偏移量竟然沒有新增kafka的分割槽的偏移量,至此,終於找到問題所在,也就是說,如果沒有新增分割槽的偏移量,那麼程式執行時是不會處理新增分割槽的資料,而我們新增的分割槽確確實實有資料落入了,這就是為啥前面說的詭異的丟失資料的原因,其實是因為新增kafka的分割槽的資料程式並沒有處理過而這個原因正是我們的自己儲存offset中沒有記錄新增分割槽的偏移量。

問題找到了,那麼如何修復線上丟失的資料呢?

當時想了一個比較笨的方法,因為我們的kafka線上預設是保留7天的資料,舊分割槽的資料已經處理過,就是新增的分割槽資料沒有處理,所以我們刪除了已經處理過的舊的分割槽的資料,然後在業務流量底峰時期,重新啟了流程式,讓其從最早的資料開始消費處理,這樣以來因為舊的分割槽被刪除,只有新分割槽有資料,所以相當於是把丟失的那部分資料給修復了。修復完成後,又把程式停止,然後配置從最新的偏移量開始處理,這樣偏移量裡面就能識別到新增的分割槽,然後就繼續正常處理即可。

注意這裡面的刪除kafka舊分割槽的資料,是一個比較危險的操作,它要求kafka的節點需要全部重啟才能生效,所以除非特殊情況,不要使用這麼危險的方式。

後來,仔細分析了我們使用的一個開源程式管理offset的原始碼,發現這個程式有一點bug,沒有考慮到kafka新增分割槽的情況,也就是說如果你的kafka分割槽增加了,你的程式在重啟後是識別不到新增的分割槽的,所以如果新增的分割槽還有資料進入,那麼你的程式一定會丟資料,因為擴充套件kafka分割槽這個操作,並不常見,所以這個bug比較難易觸發。

知道原因後,解決起來比較容易了,就是每次啟動流程式前,對比一下當前我們自己儲存的kafka的分割槽的個數和從zookeeper裡面的存的topic的分割槽個數是否一致,如果不一致,就把新增的分割槽給新增到我們自己儲存的資訊中,併發偏移量初始化成0,這樣以來在程式啟動後,就會自動識別新增分割槽的資料。

所以,回過頭來看上面的那個問題,最簡單優雅的解決方法就是,直接手動修改我們自己的儲存的kafka的分割槽偏移量資訊,把新增的分割槽給加入進去,然後重啟流程式即可。

這個案例也就是我上篇文章所說的第三個場景的case,如果是自己手動管理kafka的offset一定要注意相容新增分割槽後的這種情況,否則程式可能會出現丟失資料的問題。

相關推薦

如何管理Spark Streaming消費Kafka偏移

上篇文章,討論了在spark streaming中管理消費kafka的偏移量的方式,本篇就接著聊聊上次說升級失敗的案例。 事情發生一個月前,由於當時我們想提高spark streaming程式的並行處理效能,於是需要增加kafka分割槽個數,,這裡需要說下,在新版本sp

如何管理Spark Streaming消費Kafka偏移

前面的文章已經介紹了在spark streaming整合kafka時,如何處理其偏移量的問題,由於spark streaming自帶的checkpoint弊端非常明顯,所以一些對資料一致性要求比較高的專案裡面,不建議採用其自帶的checkpoint來做故障恢復。 在sp

Spark StreamingKafka 偏移管理

本文主要介紹 Spark Streaming 應用開發中消費 Kafka 訊息的相關內容,文章著重突出了開發環境的配置以及手動管理 Kafka 偏移量的實現。 一、開發環境 1、元件版本 CDH 叢集版本:6.0.1 Spark 版本:2.2.0 Kafka 版本:1.0.1 2、M

Kafka:ZK+Kafka+Spark Streaming集群環境搭建VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。

centos 失敗 sco pan html top n 而且 div href Centos7出現異常:Failed to start LSB: Bring up/down networking. 按照《Kafka:ZK+Kafka+Spark Streaming集群環

Apache 流框架 Flink,Spark Streaming,Storm對比分析

本文由 網易雲 釋出2.Spark Streaming架構及特性分析2.1 基本架構基於是spark core的spark streaming架構。Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處理引擎是Spark,也就是把Spark Str

kafka同步非同步消費和訊息的偏移

1. 消費者位置(consumer position) 因為kafka服務端不儲存訊息的狀態,所以消費端需要自己去做很多事情。我們每次呼叫poll()方法他總是返回已經儲存在生產者佇列中還未被消費者消費的訊息。訊息在每一個分割槽中都是順序的,那麼必然可以通過一

Kafka:ZK+Kafka+Spark Streaming集群環境搭建安裝spark2.2.1

node word clas 執行 選擇 dir clust 用戶名 uil 如何配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如

Kafka:ZK+Kafka+Spark Streaming集群環境搭建安裝kafka_2.11-1.1.0

itl CA blog tor line cat pre PE atan 如何搭建配置centos虛擬機請參考《Kafka:ZK+Kafka+Spark Streaming集群環境搭建(一)VMW安裝四臺CentOS,並實現本機與它們能交互,虛擬機內部實現可以上網。》 如

Kafka:ZK+Kafka+Spark Streaming集群環境搭建十三定義一個avro schema使用comsumer發送avro字符流,producer接受avro字符流並解析

finall ges records ring ack i++ 一個 lan cde 參考《在Kafka中使用Avro編碼消息:Consumer篇》、《在Kafka中使用Avro編碼消息:Producter篇》 pom.xml <depende

Kafka:ZK+Kafka+Spark Streaming集群環境搭建十三Structured Streaming遇到問題:Set(TopicName-0) are gone. Some data may have been missed

ack loss set div top 過程 pan check use 事情經過:之前該topic(M_A)已經存在,而且正常消費了一段時間,後來刪除了topic(M_A),重新創建了topic(M-B),程序使用新創建的topic(M-B)進行實時統計操作,執行過程中

Kafka:ZK+Kafka+Spark Streaming集群環境搭建十五Structured Streaming:同一個topic中包含一組數據的多個部分,按照key它們拼接為一條記錄以及遇到的問題

eas array 記錄 splay span ack timestamp b- each 需求: 目前kafka的topic上有一批數據,這些數據被分配到9個不同的partition中(就是發布時key:{m1,m2,m3,m4...m9},value:{records

Spark Streaming實時流處理筆記6—— Kafka 和 Flume的整合

1 整體架構 2 Flume 配置 https://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html 啟動kafka kafka-server-start.sh $KAFKA_HOME/config/se

Spark Streaming實時流處理筆記5—— Kafka API 程式設計

1 新建 Maven工程 pom檔案 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLo

Spark Streaming實時流處理筆記4—— 分散式訊息佇列Kafka

1 Kafka概述 和訊息系統類似 1.1 訊息中介軟體 生產者和消費者 1.2 Kafka 架構和概念 producer:生產者(生產饅頭) consumer:消費者(吃饅頭) broker:籃子 topic : 主題,給饅頭帶一個標籤,(

Spark Streaming消費Kafka的資料進行統計

流處理平臺: 這裡是第四步的實現: Spark Streaming整合Kafka採用的是Receiver-based,另一種方式Direct Approach,稍作修改就行。 package spark import org.apache.spark.SparkConf impo

Spark Streaming消費Kafka Direct方式資料零丟失實現

一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、

Spark Streamingkafka 整合指導kafka 0.8.2.1 或以上版本

本節介紹一下如何配置Spark Streaming 來接收kafka的資料。有兩個方法: 1、老的方法 -使用Receivers 和kafka的高階API 2、新的方法( Spark 1.3 開始引入)-不適用Receivers。這兩個方式擁有不同的程式設計模型,效能特徵

Apache 流框架 Flink,Spark Streaming,Storm對比分析2

此文已由作者嶽猛授權網易雲社群釋出。 歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。 2.Spark Streaming架構及特性分析 2.1 基本架構 基於是spark core的spark streaming架構。 Spark Streaming是將流式計算分解成一系列短小的批處理作業。這裡的批處

Spark Streaming實時流處理筆記3——日誌採集Flume

1 Flume介紹 1.1 設計目標 可靠性 擴充套件性 管理性 1.2 同類產品 Flume: Cloudera/Apache,Java Scribe: Facebook ,C/C++(不維護了) Chukwa: Yahoo

Spark Streaming實時流處理筆記2—— 實時處理介紹

1 實時和離線計算對比 1.1 資料來源 離線:HDFS 歷史資料,資料量較大 實時:訊息佇列(Kafka) 1.2 處理過程 離線:Mapreduce 實時:Spark(DStream/SS) 1.3 處理速度 離