1. 程式人生 > >利用kafka內建的特殊topic 做offsets 提交和檢索

利用kafka內建的特殊topic 做offsets 提交和檢索

In Kafka releases through 0.8.1.1, consumers commit their offsets to ZooKeeper. ZooKeeper does not scale extremely well (especially for writes) when there are a large number of offsets (i.e., consumer-count * partition-count). Fortunately, Kafka now provides an ideal mechanism for storing consumer offsets. Consumers can commit their offsets in Kafka by writing them to a durable (replicated) and highly available topic. Consumers can fetch offsets by reading from this topic (although we provide an in-memory offsets cache for faster access). i.e., offset commits are regular producer requests (which are inexpensive) and offset fetches are fast memory look ups.

The official Kafka documentation describes how the feature works and how to migrate offsets from ZooKeeper to Kafka. This wiki provides sample code that shows how to use the new Kafka-based offset storage mechanism.

Step 1: Discover and connect to the offset manager for a consumer group by issuing a consumer metadata request to any broker

import kafka.api.*; import kafka.cluster.Broker; import kafka.common.OffsetAndMetadata; import kafka.common.OffsetMetadataAndError; import kafka.common.TopicAndPartition; import kafka.javaapi.ConsumerMetadataResponse; import kafka.javaapi.OffsetCommitRequest; import
 kafka.javaapi.OffsetCommitResponse; import kafka.javaapi.OffsetFetchRequest; import kafka.javaapi.OffsetFetchResponse; import kafka.network.BlockingChannel; import java.util.*; ... try { BlockingChannel channel = new BlockingChannel("localhost"9092, BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), 5000 /* read timeout in millis */); channel.connect(); final String MY_GROUP = "demoGroup"; final String MY_CLIENTID = "demoClientId"; int correlationId = 0; final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic"0); final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic"1); channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID)); ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer()); if (metadataResponse.errorCode() == ErrorMapping.NoError()) { Broker offsetManager = metadataResponse.coordinator(); // if the coordinator is different, from the above channel's host then reconnect channel.disconnect(); channel = new BlockingChannel(offsetManager.host(), offsetManager.port(), BlockingChannel.UseDefaultBufferSize(),

相關推薦

利用kafka特殊topic offsets 提交檢索

In Kafka releases through 0.8.1.1, consumers commit their offsets to ZooKeeper. ZooKeeper does not scale extremely well (especially fo

利用python下載器-快速分享檔案

一、任務場景: 工作中需要將伺服器上的某些檔案傳給對應的同事,如果臨時去搭建檔案伺服器或者配置,操作起來不太方便 二、比如臨時需要分享/tools這個目錄的檔案,操作方法如下: 1、採用http的方式進行分享 a) python2的版本中 # cd /tools #&nb

unity特殊資料夾

  Resources資料夾:可以在根目錄下,也可以在子目錄裡,只要名子叫Resources就可以。Resources資料夾下的資源不管你用還是不用都會被打包進.apk(Android包的檔案)或者.ipa(Ios包的檔案),該資料夾下的資源會被壓縮。 Editor資料夾:Edit

(超實用)利用Python 【下載伺服器】傳輸檔案

1. 首先進入我想要傳輸的檔案所在的目錄 假設我要傳輸的檔案在/home/pi 目錄下: cd /home/pi/ 2. 然後啟動一個下載伺服器 python3 版: python3 -m http.server python2 版: python -m

利用MyEclipse架包搭建ssh框架詳細步驟

ssh整合 我們一般在搭建框架的時候經常使用的是匯入架包然後搭建專案,但是如果我們在沒有架包的情況下怎麼來搭建ssh框架呢 除開我們用別的框比如meavn搭建,但是不排除在沒有網路的情況下我們也無法完整的搭建出來,那麼現在我就教大家一種可以在沒有網路的情況下搭建框架的步驟

利用python函式,快速統計單詞在文字中出現的次數

python中包含許多標準程式設計資料結構,如list(列表),tuple(元組)、dict(字典)和set(),如果現有的資料型別不能滿足需求,可以派生某個內建型別進行定製,或者使用collections中定義的某個抽象基類作為起點構建一個新的容器型別。 c

利用pythonK-Means聚類演算法實現鳶尾花資料的聚類

在進去聚類情況分析前,我們需要為我們的IDLE安裝sklearn庫,scikit-learn是Python的一個開源機器學習模組,它建立在NumPy,SciPy和matplotlib模組之上能夠為使用者提供各種機器學習演算法介面,可以讓使用者簡單、高效地進行資料探勘和資料分析

利用navicat創存儲過程、觸發器使用遊標的簡單實例

.net fill student default hand 結果 alt 神奇 行記錄 原文鏈接 創建存儲過程和觸發器 1、建表 首先先建兩張表(users表和number表),具體設計如下圖: 2、存儲過程 寫一個存儲過程,往users表中插入數據,創建

函數(builtins)functools

ash fun bcd 不能 linux ems import 註意 tlist 內建函數 Build-in Function,啟動python解釋器,輸入dir(__builtins__), 可以看到很多python解釋器啟動後默認加載的屬性和函數,這些函數稱之為內建函數

解決 WPF 自繪窗體 AllowsTransparency = true Webbrowser 等窗體顯示衝突的辦法思路

       上述示例程式的幾個關鍵點: WPF 窗體為主窗體,窗體風格為 AllowsTransparency = true 分別對主窗體內建 WPF 的 Webbrowser 控制元件和 Winform 的 Webbrowser 控制元件進行了演示 完美的窗體縮放、

SQL server 函式之隨機函式newid()rand()

從A表隨機取2條記錄,用SELECT TOP 10 * FROM ywle order by newid() order by 一般是根據某一欄位排序,newid()的返回值 是uniqueidentifier ,order by newid()隨機選取記錄是如何進行的

『Numpy』存分析_利用共享存創數組

highlight block 標識 查看 空間 types length PE 獲取 引、內存探究常用函數 id(),查詢對象標識,通常返回的是對象的地址 sys.getsizeof(),返回的是 這個對象所占用的空間大小,對於數組來說,除了數組中每個值占用空間外,數

Kafka&查看topic,生產&消費指定topic消息

png 所有 trap 回車 server world 圖片 kafka 技術 啟動zookeeper和Kafka之後,進入kafka目錄(安裝/啟動kafka參考前面一章:https://www.cnblogs.com/cici20166/p/9425613.html)

awk 幾個特殊變數

1、特殊變數: NR:讀取到所有記錄(包括多個檔案)的行數索引號(大概是Number Of Record的意思);FNR:只的是awk所讀取到的每個檔案中的行數索引號,當檔案發生切換時候,FNR重新從1開始,大概是File Number Of Record的意思;NF:當前行中的欄位個數(列

AC6005portal匿名登入

<AC6005>system-view [AC6005]interface loopback 0[AC6005-LoopBack0]ip address 10.1.1.1 32[AC6005-LoopBack0]quit[AC6005]http secure-server ssl-policy

python--面向物件的特殊方法(反射,方法)

1.反射:hasattr,getattr,setattr,delattr 使用字串資料型別的變數名來操作一個變數的值使用反射獲取某個名稱空間中的值, 需要 有一個變數指向這個名稱空間的 字串資料型別的名字, 再使用getattr獲取值, 如果是變數能直接

在不使用JavaScript的parseInt()函式的前提下,利用mapreduce操作實現一個string2int()函式

在不使用JavaScript內建的parseInt()函式的前提下,利用map和reduce操作實現一個string2int()函式 題目出自廖雪峰老師的JavaScript教程: 把一個字串13579先變成Array——[1, 3, 5, 7, 9],再利用reduce()就可以寫出一

Python用字串二進位制乘法Python呼叫進位制轉換我踩的坑

注意本文加法的思想來源於 https://blog.csdn.net/qiubingcsdn/article/details/82263114 其餘為我自己做的 字串str的末位是實際上計算的首位,所以第一步是翻轉字串 首先弄清楚python的字串列表

使用js物件document對html的標籤一些動態的操作

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <ti