1. 程式人生 > >KafkaSpout分析:配置

KafkaSpout分析:配置

文章來源:http://www.mamicode.com/info-detail-515641.html
public KafkaSpout(SpoutConfig spoutConf) {
        _spoutConfig = spoutConf;
}

SpoutConfig繼承自KafkaConfig。由於SpoutConfig和KafkaConfig所有的instance field全是public, 因此在使用構造方法後,可以直接設定各個域的值。

public class SpoutConfig extends KafkaConfig implements Serializable {
    
public List<String> zkServers = null; //記錄Spout讀取進度所用的zookeeper的host public Integer zkPort = null;//記錄進度用的zookeeper的埠 public String zkRoot = null;//進度資訊記錄於zookeeper的哪個路徑下 public String id = null;//進度記錄的id,想要一個新的Spout讀取之前的記錄,應把它的id設為跟之前的一樣。 public long stateUpdateIntervalMs = 2000;//用於metrics,多久更新一次狀態。
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot; this.id = id; } }
public class KafkaConfig implements Serializable {

    public final BrokerHosts hosts; //用以獲取Kafka broker和partition的資訊
    public
final String topic;//從哪個topic讀取訊息 public final String clientId; // SimpleConsumer所用的client id public int fetchSizeBytes = 1024 * 1024; //發給Kafka的每個FetchRequest中,用此指定想要的response中總的訊息的大小 public int socketTimeoutMs = 10000;//與Kafka broker的連線的socket超時時間 public int fetchMaxWait = 10000; //當伺服器沒有新訊息時,消費者會等待這些時間 public int bufferSizeBytes = 1024 * 1024;//SimpleConsumer所使用的SocketChannel的讀緩衝區大小 public MultiScheme scheme = new RawMultiScheme();//從Kafka中取出的byte[],該如何反序列化 public boolean forceFromStart = false;//是否強制從Kafka中offset最小的開始讀起 public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();//從何時的offset時間開始讀,預設為最舊的offset public long maxOffsetBehind = 100000;//KafkaSpout讀取的進度與目標進度相差多少,相差太多,Spout會丟棄中間的訊息 public boolean useStartOffsetTimeIfOffsetOutOfRange = true;//如果所請求的offset對應的訊息在Kafka中不存在,是否使用startOffsetTime public int metricsTimeBucketSizeInSecs = 60;//多長時間統計一次metrics public KafkaConfig(BrokerHosts hosts, String topic) { this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId()); } public KafkaConfig(BrokerHosts hosts, String topic, String clientId) { this.hosts = hosts; this.topic = topic; this.clientId = clientId; } }

 對Zookeeper的使用

KafkaSpout的配置中有兩個地方可以用到Zookeeper

  1. 用Zookeeper來記錄KafkaSpout的處理進度,在topology重新提交或者task重啟後繼續之前的處理進度。在SpoutConfig中的zkServers, zkPort和zkRoot與此相關。如果zkServer和zkPort沒有設定,那麼KafkaSpout會使用Storm叢集所用的Zookeeper記錄這些資訊。
  2. 用Zookeeper來獲取Kafka中一個topic的所有partition,和每個partition的leader。這需要實現BrokerHosts的子類ZkHosts.但是,這個Zookeepr是可選的。如果使用BrokerHosts的另一個子類StaticHosts,把partition和leader的對應關係硬編碼,則不需要Zookeeper來提供此功能。KafkaSpout會從Kafka叢集使用的Zookeeper中提取partition和leader的對應關係。而且:
    • 如果使用StatisHosts,那麼KafkaSpout會使用StaticCoordinator,這個coordinator不能響應partition leader的變化。
    • 如果使用ZkHosts,那麼KafkaSpout會使用ZkCoordinator, 當其refresh()方法被呼叫後,這個cooridnator會檢查發生leader變更的partition,併為之生成新的PartitionManager.從而能夠在leader變更後,繼續讀取訊息。

影響初始讀取進度的配置項

在一個topology上線後,它從哪個offset開始讀取訊息呢?有一些配置項對此有影響:

  1. SpoutConfig中的id欄位。如果想要一個topology從另一個topology之前的處理進度繼續處理,它們需要有相同的id。
  2. KafkaConfig的forceFromStart欄位。如果此欄位設為true, 那麼它一個topology上線後,它會忽略之前相同id的topology的進度,並且從Kafka中最早的訊息開始處理。
  3. KafkaConfig的startOffsetTime欄位。預設為kafka.api.OffsetRequest.EarliestTime()開始讀,也就是從Kafka中最早的訊息開始處理。也可以設成kafka.api.OffsetRequest.LatestOffset,也就是最早的訊息開始讀。也可以自己指定具體的值。
  4. KafkaConfig的maxOffsetBehind欄位。這個欄位對於KafkaSpout的多個處理流程都有影響。當提交一個新topology時,如果沒有forceFromStart, 當KafkaSpout對某個partition的處理進度落後startOffsetTime對應的offset多於此值時,KafkaSpout會丟棄中間的訊息,從而強制趕上目標進度.比如,如果startOffsetTime設成了lastestTime,那麼如果進度落後超過maxOffsetBehind,KafkaSpout會直接從latestTime對應的offset開始處理。如果設成了froceFromStart,則在提交新任務時,始終會從EarliestTime開始讀。
  5. KafkaSpout的userStartOffsetTimeIfOffsetOutOfRange欄位。如果設成true,那麼當fetch訊息時出錯,且FetchResponse顯示的出錯原因是OFFSET_OUT_OF_RANGE,那麼就會嘗試從KafkaSpout指定的startOffsetTime對應的訊息開始讀。例如,如果有一批訊息因為超過了儲存期限被Kafka刪除,並且zk裡記錄的訊息在這批被刪除的訊息裡。如果KafkaSpout試圖從zk的記錄繼續讀,那麼就會出現OFFSET_OUT_OF_RANGE的錯誤,從而觸發這個配置。

實際上maxOffsetBehind有時候有點名不符實。當startOffsetTime為A, zk裡的進度為B, A - B > maxOffsetBehind時,應該從A - maxOffsetBehind除開始讀或許更好一些,而不是直接跳到startOffsetTime。此處的邏輯參見PartitionManager的實現。

附:其中KafkaConfig的maxWait的意義請參見這篇文章 《卡夫卡的煉獄》

相關推薦

KafkaSpout分析配置

文章來源:http://www.mamicode.com/info-detail-515641.html public KafkaSpout(SpoutConfig spoutConf) { _spoutConfig = spoutConf;} Spo

病毒分析虛擬網絡環境配置

dns thread rep dns服務器 註釋 兩個 虛擬網絡 服務器 查看 創建主機網絡模式: 主機模式(Host-only)網絡,可以在宿主操作系統和客戶操作系統之間創建一個隔離的私有局域網,在進行惡意代碼分析時,這是通常采用的聯網方式。主機模式的局域網並不會連接到互

Spring Developer Tools 原始碼分析三、重啟自動配置'

接上文 Spring Developer Tools 原始碼分析:二、類路徑監控,接下來看看前面提到的這些類是如何配置,如何啟動的。 spring-boot-devtools 使用了 Spring Boot 的自動配置方式,我們先關注本地開發環境中自動重啟的部分。 在 LocalDevToolsAut

lighttpd1.4.20原始碼分析安裝與配置

1、有兩種渠道下載原始碼,分別是: http://www.lighttpd.net/download/ -- 官網https://github.com/lighttpd -- GitHub 官網下載的原始碼和GitHub的略有不同,我們以前者,也就是官網的為準。   2

alsa分析alsa的那些配置檔案 ( 1 )

在根檔案系統下,alsa相關的配置檔案有: 在/system/usr/share/alsa目錄下: ├── alsa.conf ├── cards │ └── aliases.conf └── pcm ├── center_lfe.conf ├── d

Pro Android學習筆記(一三七)Home Screen Widgets(3)配置Activity

map onclick widgets info xtra ces extends height appwidget 文章轉載僅僅能用於非商業性質,且不能帶有虛擬貨幣、積分、註冊等附加條件。轉載須註明出處http://blog.csdn.net/flowingfly

勒索病毒入侵XP古董電腦尷尬一幕配置太低被強制停止

微軟 電腦 臺灣 程序 用戶 上周爆發的勒索病毒至今已經讓數十萬PC受害,為了防止更多用戶感染,微軟和各家安全公司都紛紛給出更新補丁或者臨時防範策略。  不過,抵禦勒索病毒的終極措施或許只是一臺“古董PC”這麽簡單。  臺灣網友萊恩15日曬圖稱“電腦爛到病毒跑不動。”(WinXP+古董配置

Eureka源碼分析Eureka不會進行二次Replication的原因

實例 .get 新版 replica ide 倉庫 efault springmvc XML Eureka不會進行二次同步註冊信息 Eureka會將本實例中的註冊信息同步到它的peer節點上,這是我們都知道的特性。然而,當peer節點收到同步數據後,並不會將這些信息再同步

遊戲UI框架設計(五) 配置管理與應用

unity界面框架 unityui框架 ui框架配置 unity配置管理 遊戲UI框架設計(五)--配置管理與應用 在開發企業級遊戲/VR/AR產品時候,我們總是希望可以總結出一些通用的技術體系,框架結構等,為簡化我們的開發起到“四兩撥千金”的作用。所謂“配置管理”是指一個遊戲項目(軟件項

產品分析普通人如何通過火山小視頻賺錢

使用 活躍 視頻 原因 一個 -i 做了 註冊賬號 條件 最近火山小視頻突然爆紅,大有突襲快手之意。對於我等吃瓜群眾而言,最關註的是這裏面是否有潛藏的賺錢機會。 如果你對這個問題感興趣,且聽我一一道來。 一、火山小視頻火到什麽程度? 要想賺錢,首先要有流量和關註度,無圖無真

Nginx實用教程(二)配置文件入門

affinity type 服務 源碼編譯 設置時間 shutdown ber 可用 控制指令 Nginx配置文件結構 nginx配置文件由指令(directive)組成,指令分為兩種形式,簡單指令和區塊指令。 一條簡單指令由指令名、參數和結尾的分號(;)組成,例如:

spark深入配置文件與日誌

oca cut 就會 name ima ast auto agg 日誌 spark2.1與hadoop2.7.3集成,spark on yarn模式下,需要對hadoop的配置文件yarn-site.xml增加內容,如下: <property>

前端性能分析分析百度和sogou

後臺 性能 con 圖片 work char sogo wait 部分 先用httpwatch錄制這兩個網站:www.baidu.com www.sogou.com 由上圖可以看到: 百度用時0.278s 發送7831B 接收36620B 13個請求 搜狗

【程序5】 題目利用條件運算符的嵌套來完成此題學習成績>=90分的同學用A表示,60-89分之間的用B表示,60分以下的用C表示。 1.程序分析(a>b)?a:b這是條件運算符的基本例子。

window code 例子 prompt 利用 學習 amp text span if…else語句相對比較多,但是容易理解 1 var scroe = window.prompt("請輸入1-100之間的數") 2 scroe = parseInt

Spring Boot入門第三天配置日誌系統和Druid數據庫連接池。

禁用 css ret 輸入 ogg servlet log http gif 一、日誌管理 1.在application.properties文件中加入如下內容: logging.level.root=WARN logging.level.org.springfram

Spring Cloud 入門教程(三) 配置自動刷新

入門 stc pro 解決方案 con log clas ring color 之前講的配置管理, 只有在應用啟動時會讀取到GIT的內容, 之後只要應用不重啟,GIT中文件的修改,應用無法感知, 即使重啟Config Server也不行。 比如上一單元(Spring Clo

Vue2+VueRouter2+webpack 構建項目實戰(三)配置路由,運行頁面

margin not found sans product mage -a nod targe fig 制作.vue模板文件 通過前面的兩篇博文的學習,我們已經建立好了一個項目。問題是,我們還沒有開始制作頁面。下面,我們要來做頁面了。 我們還是利用 http://cno

Vue2+VueRouter2+webpack 構建項目實戰(五)配置子路由

dex log fault 地址 數據 from 插入 接口 content 前言 通過前面幾章的實戰,我們已經順利的構建項目,並且從API接口獲取到數據並且渲染出來了。制作更多的頁面,更復雜的應用,就是各位自己根據自己的項目去調整的事情了。 本章講一下如何配置子路由,

cocos2d-x 源代碼分析 EventDispatcher、EventListener、Event 源代碼分析 (新觸摸機制,新的NotificationCenter機制)

get cti state 2.7 return 3.1 成了 available been 源代碼版本號來自3.x,轉載請註明 cocos2d-x 源代碼分析總文件夾 http://blog.csdn.net/u011225840/article/detail

phpmyadmin-錯誤配置文件權限錯誤,不應任何用戶都能修改!這裏有答案

highlight 文字 所有權限 配置文件 inux style true 所有 整理 今天在linux下使用phpMyadmin的時候突然出現這個紅色警告。差點把我嚇暈在電腦前。不過冷靜想一下這個報錯,肯定就是linux權限那幾個 ‘7’ 惹的禍。 於是 通過命令