1. 程式人生 > >Kafka中的訊息是否會丟失和重複消費

Kafka中的訊息是否會丟失和重複消費

        在之前的基礎上,基本搞清楚了Kafka的機制及如何運用。這裡思考一下:Kafka中的訊息會不會丟失或重複消費呢?為什麼呢?

        要確定Kafka的訊息是否丟失或重複,從兩個方面分析入手:訊息傳送和訊息消費

1、訊息傳送

         Kafka訊息傳送有兩種方式:同步(sync)和非同步(async),預設是同步方式,可通過producer.type屬性進行配置。Kafka通過配置request.required.acks屬性來確認訊息的生產:

0---表示不進行訊息接收是否成功的確認;

1---表示當Leader接收成功時確認;

-1---表示Leader和Follower都接收成功時確認;

綜上所述,有6種訊息生產的情況,下面分情況來分析訊息丟失的場景:

(1)acks=0,不和Kafka叢集進行訊息接收確認,則當網路異常、緩衝區滿了等情況時,訊息可能丟失;

(2)acks=1、同步模式下,只有Leader確認接收成功後但掛掉了,副本沒有同步,資料可能丟失;

2、訊息消費

        Kafka訊息消費有兩個consumer介面,Low-level API和High-level API:

Low-level API:消費者自己維護offset等值,可以實現對Kafka的完全控制;

High-level API:封裝了對parition和offset的管理,使用簡單;

如果使用高階介面High-level API,可能存在一個問題就是當訊息消費者從叢集中把訊息取出來、並提交了新的訊息offset值後,還沒來得及消費就掛掉了,那麼下次再消費時之前沒消費成功的訊息就“詭異

”的消失了;    

解決辦法:

        針對訊息丟失:同步模式下,確認機制設定為-1,即讓訊息寫入Leader和Follower之後再確認訊息傳送成功;非同步模式下,為防止緩衝區滿,可以在配置檔案設定不限制阻塞超時時間,當緩衝區滿時讓生產者一直處於阻塞狀態;

        針對訊息重複:將訊息的唯一標識儲存到外部介質中,每次消費時判斷是否處理過即可。

Kafka的Leader選舉機制

Kafka將每個Topic進行分割槽Patition,以提高訊息的並行處理,同時為保證高可用性,每個分割槽都有一定數量的副本 Replica,這樣當部分伺服器不可用時副本所在伺服器就可以接替上來,保證系統可用性。在Leader上負責讀寫,Follower負責資料的同步。當一個Leader發生故障如何從Follower中選擇新Leader呢?

        Kafka在Zookeeper上針對每個Topic都維護了一個ISR(in-sync replica---已同步的副本)的集合,集合的增減Kafka都會更新該記錄。如果某分割槽的Leader不可用,Kafka就從ISR集合中選擇一個副本作為新的Leader。這樣就可以容忍的失敗數比較高,假如某Topic有N+1個副本,則可以容忍N個伺服器不可用。

        如果ISR中副本都不可用,有兩種處理方法:

(1)等待ISR集合中副本復活後選擇一個可用的副本;

(2)選擇叢集中其他可用副本;

具體可參考:http://www.jasongj.com/2015/04/24/KafkaColumn2/

相關推薦

Kafka訊息是否丟失重複消費

        在之前的基礎上,基本搞清楚了Kafka的機制及如何運用。這裡思考一下:Kafka中的訊息會不會丟失或重複消費呢?為什麼呢?        要確定Kafka的訊息是否丟失或重複,從兩個方面

Kafka訊息丟失重複嗎?——如何實現Kafka精確傳遞一次語義

我們都知道Kafka的吞吐量很大,但是Kafka究竟會不會丟失訊息呢?又會不會重複消費訊息呢? 圖 無人機實時監控 ​ 有很多公司因為業務要求必須保證訊息不丟失、不重複的到達,比如無人機實時監控系統,當無人機闖入機場區域,我們必須立刻報警,不允許訊息丟失。而無人機離開禁飛區域後我們需要將及時報警解除。

SparkStreaming消費Kafka的資料 使用zookeeperMySQL儲存偏移量的兩種方式

Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka

kafkatopic的partition數量customerGroup的customer數量關係以及storm消費kafka時並行度設定問題總結:

前段時間通過網上查詢和自己測試仔細研究了partition和customer關係以及工作中遇到的storm並行度調整的問題,認真梳理了一下現總結如下: 一、先說kafka部分: produce方面: 如果有多個分割槽,傳送的時候按照key值hashCode%partit

[問題記錄]解決RabbitMQ訊息丟失重複消費問題

本文僅記錄排查和問題定位、解決的過程。 1. 背景 最近使用者反饋提交的SQL查詢一直處於長時間等待狀態,經過排查觀察,發現部分查詢請求丟失,導致使用者提交的查詢未被正常接收,繼而長時間無響應。 現象:集市SQL控制檯提交10個簡單SQL查詢 -&

解決RabbitMQ訊息丟失重複消費問題

1. 背景 最近使用者反饋提交的SQL查詢一直處於長時間等待狀態,經過排查觀察,發現部分查詢請求丟失,導致使用者提交的查詢未被正常接收,繼而長時間無響應。 現象:即使SQL控制檯提交10個簡單SQL查詢 -> 訊息傳送方:傳送10條訊息至訊息佇列 -&

SpringBoot消費RabbitMQ 通過死信保證無法消費訊息丟失

由於最近剛剛接觸RabbitMQ  自己在測試伺服器搭建了一個RabbitMQ的服務具體安裝過程參見連線[didi大神的部落格]大家再用RabbitMQ 的時候經常會遇到消費Mq的訊息失敗的情況,一般情況下會根據不同的業務場景通過不同的辦法去記錄下無法消費的訊息的資料,本文簡

Kafka 非同步訊息阻塞?記一次 Dubbo 頻繁超時排查過程

線上某服務 A 呼叫服務 B 介面完成一次交易,一次晚上的生產變更之後,系統監控發現服務 B 介面頻繁超時,後續甚至返回執行緒池耗盡錯誤 Thread pool is EXHAUSTED。因為服務 B 依賴外部介面,剛開始誤以為外部介面延時導致,所以臨時增加服務 B dubbo 執行緒池執行緒數量。配置變

RabbitMQ延遲消費重複消費

轉載自 https://blog.csdn.net/quliuwuyiz/article/details/79301054 使用RabbitMQ實現延遲任務 場景一:物聯網系統經常會遇到向終端下發命令,如果命令一段時間沒有應答,就需要設定成超時。 場景二:訂單下單之後30分鐘後,如

kafka重置consumer的offset 資料重複消費

[[email protected] ~]/opt/cloudera/parcels/KAFKA-2.0.1-1.2.0.1.p0.5/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --br

解決rabbitmq訊息佇列的順序及重複消費問題

五一期間去韓國遊玩,順便去了朋友公司扯淡去了。 所謂的扯淡,就是過去聽技術分享,有python, golang, devops,docker一些話題。總的來說,技術方面跟國內還是有一些差距的。  正題開始,因為業務的各方面的強需求,我們使用了rabbitmq作為訊息佇

一道面試題 訊息中介軟體,怎麼解決訊息的冪等性(訊息怎麼防止不被重複消費)

訊息中介軟體中,怎麼解決訊息的冪等性(訊息消費怎麼防止不被重複消費。) 如果SpringBoot和ActiveMQ整合,程式碼不拋異常,標識為消費成。 ActiveMQ 消費程式碼丟擲異常,就會一直重試(10次)。 消費者端丟擲異常,怎麼解決 日誌mongdb(json)、redis、資

Kafka如何保證訊息丟失重複

轉載:https://blog.csdn.net/matrix_google/article/details/79888144 首先要考慮這麼幾個問題: 訊息丟失是什麼造成的,從生產端和消費端兩個角度來考慮 訊息重複是什麼造成的,從生產端和消費端兩個角度來考慮  

Kafka重複消費丟失資料研究

Kafka重複消費原因 底層根本原因:已經消費了資料,但是offset沒提交。 原因1:強行kill執行緒,導致消費後的資料,offset沒有提交。 原因2:設定offset為自動提交,關閉kafka時,如果在close之前,呼叫 consumer.unsubscr

kafka訊息丟失?為什麼?看了這個你就清楚了

訊息傳送方式 想清楚Kafka傳送的訊息是否丟失,需要先了解Kafka訊息的傳送方式。 Kafka訊息傳送分同步(sync)、非同步(async)兩種方式 預設是使用同步方式,可通過producer.type屬性進行配置; Kafka保證訊息被安全生產,有三

kafka:如何保證訊息丟失重複

首先要考慮這麼幾個問題: 訊息丟失是什麼造成的?(從生產端和消費端兩個角度來考慮) 訊息重複是什麼造成的?(從生產端和消費端兩個角度來考慮) 如何保證訊息有序? 如果保證訊息不重不漏,損失的是什麼? 下面是文章詳情,這裡先

kafka生產者消費者API

actor 成功 edm icc per class 持久化 spout payment 使用idea實現相關API操作,先要再pom.xml重添加Kafka依賴: <dependency> <groupId>

EF Core DbContext不跟蹤聚合方法Join方法返回的結果

contex cor core 匿名類型 也不會 類型 eating count 此外 EF Core中: 如果調用Queryable.Count等聚合方法,不會導致DbContext跟蹤(track)任何實體。 此外調用Queryable.Join方法返回的匿名類

c#使用鉤子函式出現字母重複少最後一個字元的問題

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System; using System.Collect

kafka 的 內網公網IP配置

當使用阿里雲或者有公網IP和內網IP的伺服器時,搭建kafka叢集,使用公網接受資料,通過內網傳輸到hdfs等供消費。 1、/etc/hosts裡配置內網IP 10.161.241.171  yourhostname   2、kafka的server端配置如下