Spark+Kafka的Direct方式將偏移量傳送到Zookeeper實現
Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Receiver的方法要高效。但是因為是Spark Streaming系統自己維護Kafka的讀偏移量,而Spark Streaming系統並沒有將這個消費的偏移量傳送到Zookeeper中,這將導致那些基於偏移量的Kafka叢集監控軟體(比如:Apache Kafka監控之Kafka Web Console、Apache Kafka監控之KafkaOffsetMonitor
我們從Spark的官方文件可以知道,維護Spark內部維護Kafka便宜了資訊是儲存在HasOffsetRanges
類的offsetRanges
中,我們可以在Spark
Streaming程式裡面獲取這些資訊:
這樣我們就可以獲取所以分割槽消費資訊,只需要遍歷offsetsList,然後將這些資訊傳送到Zookeeper即可更新Kafka消費的偏移量。完整的程式碼片段如下:
KafkaCluster
類用於建立和Kafka叢集的連結相關的操作工具類,我們可以對Kafka中Topic的每個分割槽設定其相應的偏移量Map((topicAndPartition,
offsets.untilOffset))
,然後呼叫KafkaCluster
類的setConsumerOffsets
方法去更新Zookeeper裡面的資訊,這樣我們就可以更新Kafka的偏移量,最後我們就可以通過KafkaOffsetMonitor之類軟體去監控Kafka中相應Topic的消費資訊,下圖是KafkaOffsetMonitor的監控情況:
如果想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共帳號:iteblog_hadoop
從圖中我們可以看到KafkaOffsetMonitor監控軟體已經可以監控到Kafka相關分割槽的消費情況,這對監控我們整個Spark Streaming程式來非常重要,因為我們可以任意時刻了解Spark讀取速度。另外,KafkaCluster工具類的完整程式碼如下:
相關推薦
Spark+Kafka的Direct方式將偏移量傳送到Zookeeper的實現
Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Receiver的方法要高效。但是因為是Spar
Spark+Kafka的Direct方式將偏移量傳送到Zookeeper實現
Apache Spark 1.3.0引入了Direct API,利用Kafka的低層次API從Kafka叢集中讀取資料,並且在Spark Streaming系統裡面維護偏移量相關的資訊,並且通過這種方式去實現零資料丟失(zero data loss)相比使用基於Rece
請問如何將偏移量轉換為地址?
請問如何將偏移量轉換為地址? 我在使用軟體定位木馬特徵碼的時候,只得到偏移量,為什麼有的人用OD轉到偏移量,但我用OD的時候,顯示沒有這個偏移量的,請問如何將偏移量轉換為地址? 在DFCG回答過,再轉過來吧。+---------+---------+----
Spark Streaming管理Kafka偏移量
前言 為了讓Spark Streaming消費kafka的資料不丟資料,可以建立Kafka Direct DStream,由Spark Streaming自己管理offset,並不是存到zookeeper。啟用Spark Streaming的 checkpoints是儲存偏移量的最簡單方法,因為它可以
Spark Streaming 之 Kafka 偏移量管理
本文主要介紹 Spark Streaming 應用開發中消費 Kafka 訊息的相關內容,文章著重突出了開發環境的配置以及手動管理 Kafka 偏移量的實現。 一、開發環境 1、元件版本 CDH 叢集版本:6.0.1 Spark 版本:2.2.0 Kafka 版本:1.0.1 2、M
SparkStreaming消費Kafka中的資料 使用zookeeper和MySQL儲存偏移量的兩種方式
Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka
spark維護偏移量二
may exe 初始 進程 map message created reac 廣播 import kafka.common.TopicAndPartition import kafka.utils.ZkUtils import org.I0Itec.zkclient.Zk
計算C結構體成員偏移量兩種方式本質上是一樣的
BE main print tdd of函數 pan color c結構體 計算 #include <stdio.h> #include <stddef.h> typedef struct test_st { char a[3];
采用位異或方式將兩個變量數值調換
clas 兩個 rgs package 異或 dem int span [] 1 package debug; 2 3 class Demo2 { 4 public static void main(String[] args){ 5
PB 通過http協議以POST方式將XML傳送到協議伺服器
------解決方案-------------------------------------------------------- Blob lblb_args String ls_header String ls_url String ls_args long ll_length integer li_r
如何管理Spark Streaming消費Kafka的偏移量(二)
上篇文章,討論了在spark streaming中管理消費kafka的偏移量的方式,本篇就接著聊聊上次說升級失敗的案例。 事情發生一個月前,由於當時我們想提高spark streaming程式的並行處理效能,於是需要增加kafka分割槽個數,,這裡需要說下,在新版本sp
如何管理Spark Streaming消費Kafka的偏移量(三)
前面的文章已經介紹了在spark streaming整合kafka時,如何處理其偏移量的問題,由於spark streaming自帶的checkpoint弊端非常明顯,所以一些對資料一致性要求比較高的專案裡面,不建議採用其自帶的checkpoint來做故障恢復。 在sp
kafka直連方式,使用redis儲存偏移量
使用Redis來記錄偏移量,以前用receive方式時,使用zookeeper儲存偏移量,不用自己儲存偏移量,使用直連方式可以自己儲存偏移量,更加靈活。在直連方式中,儲存偏移量可以使用zookeeper,也可以使用mysql、redis等來儲存偏移量,下面使用一
zookeeper上修改kafka消費組的偏移量
[[email protected] bin]$ zookeeper-shell.sh 192.168.0.1:2181 Connecting to 192.168.0.1:2181 Wel
PB9.0 通過http協議以POST方式將XML傳送到協議伺服器
最近接觸了手機支付系統,現在需要將生成的xml傳送至伺服器。 我查看了pb的幫助檔案,posturl可以實現該功能 servicereference.PostURL ( urlname, urldata, headers, {serverport, } data )
sparkStreaming 與fafka直接方式 進行消費者偏移量的保存如redis 裏面 避免代碼改變與節點重啟後的數據丟失與序列化問題
create term tex ria streaming 保存 else config cal import java.util import kafka.common.TopicAndPartition import kafka.message.Messag
spark streaming中維護kafka偏移量到外部介質
.exe topic _each keys off exec lose eat comm spark streaming中維護kafka偏移量到外部介質 以kafka偏移量維護到redis為例。 redis存儲格式 使用的數據結構為string,其中key為topic:
結構體偏移量(sizeof長度)的簡單研究
long long size 一個 eof sig stdio.h 輸出結果 答案 cnblogs 總能夠網上搜到這樣的,關於結構體sizeof的答案,然而,經過這個簡單的實驗以後,發現gcc5.3編譯的結果並非如此。 字節對齊的細節和具體編譯器實現相關,但一般而言,滿足
什麽是二維數組偏移量?
[1] 地址 結束 它的 方式 所在地 連續存儲 若有 二維 比如:A[][]={{1,2,3},{4,5,6},{7,8,9}};4的偏移量就是3,8的偏移量就是7。對一個數組 A[M][N]中任一元素A[i][j]的偏移量的計算方法就是:i*N+j;比如:上面的4位置
ES6學習筆記二 新的聲明方式和變量的解構賦值!
是什麽 一句話 數組 name ont 簡單 cee 問題 二次 新的聲明方式 在ES5的時候,我們只有一個聲明方式,var!但是在es6中,聲明進行了擴展,我們加上ES5的var的申明方式,我們有了三種聲明方式: var:它是variable的簡寫,可以理解成變量的意思