1. 程式人生 > >sparkstreaming消費kafka如何保證輸出結果只會產生一次?(事務性)

sparkstreaming消費kafka如何保證輸出結果只會產生一次?(事務性)

最近開始使用sparkstreaming+kafka0.10,使用過程中碰到問題:

    steaming採用的direct方式 ,(這種方式和receiver方式的對比效能會好很多),spark計算完資料之後有一個結果入庫操作,現在問題來了,採用自動提交的時候程式二次啟動經常會出現重複消費的情況,並且怎麼保證這個結果只生產一次呢?

首先介紹一下sparkstreaming內部是怎麼做到訊息只計算一次的  :

    1.topic的資料拉過來之後計算的時候出現的各種記憶體溢位等異常,這些spark自己會有重計算

    2.由於程式碼導致的異常,不會重複計算,結果就不會產生,這時你可以自己想辦法重新計算

 再來了解一下spark是怎麼消費kafka的,這個可能很多初學者不瞭解,下面是分析DirectKafkaInputStream:

啟動時會進行start方法

override def start(): Unit = {   val c = consumer   paranoidPoll(c)   if (currentOffsets.isEmpty) {//啟動時肯定為空     currentOffsets = c.assignment().asScala.map { tp => //根據 auto.offset.reset的配置 earliest為從最近提交的offset處開始 latest從該topic的最新訊息的位置開始     tp -> c.position(tp)     }.toMap   } currentoffsets儲存的是上次消費的partition以及offset等資訊,

之後拉取資料就是compute方法了

 override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {     val untilOffsets = clamp(latestOffsets())     ....     currentOffsets = untilOffsets     commitAll()     ...     } latestOffsets會將新增的分割槽資訊獲取到 clamp 根據設定 spark.streaming.kafka.maxRatePerPartition * partitionsize 限制每個partition每次拉取的資料量

streaming只會在第一次啟動時使用到kafka中儲存的的offset,然後將消費的position儲存在currentoffsets中,此後kafka中的offset值只供下一次啟動 時用到。其中commitAll()就是提交offset,入夥enable.auto.commit為true則是根據 auto.commit.interval.ms 的值週期性的非同步提交,false的話可以 手動提交: val ds = KafkaUtils.createDirect..... ds.foreachRDD(rdd=>{ val com =  ds.asInstanceOf[CanCOmmitOffsets] val off  = rdd.asInstanceOf[HasOffsetRanges].offsetRanges com.commitAsync(off ,new OffsetCommitCallback{ override def onComplete(..... }) } ) 官方只提供了非同步提交的方法,同步提交可自己使用KafkaConsumer.commitSync方法實現...

現在回到那個問題:採用自動提交的時候程式二次啟動經常會出現重複消費的情況,並且怎麼保證這個結果只生產一次呢?

因為預設是非同步批量提交offset,commitAsync方法還沒執行完程式就停止了,因為非同步提交一般要等很久! 所有二次啟動肯定會出現訊息重複消費。

解決方案:

1.自己實現commitsync方法 ,這個方法執行時毫秒級

2.每次消費完之後將offset儲存到hdfs或本地,啟動的時候讀這個檔案並且加入新增的partition offset即可

但是!上面的方案還是會出現問題。。。

因為入庫操作和commitc方法不是一個原子操作。

所以程式停止時可能出現commitsync方法未完成或hdfs檔案未寫完..雖然概率很小。。

為了真正的解決上述問題,唯一的最合適的方案:

將offset資訊同結果資料一起入庫,保證是一個原子操作,這樣就萬無一失了

---------------------  作者:朱繼業1993  來源:CSDN  原文:https://blog.csdn.net/u013314600/article/details/80929310  版權宣告:本文為博主原創文章,轉載請附上博文連結!