Kafka學習之consumer端部署及API
1、consumer.properties:檔案位於/resources目錄下
zookeeper.connect=192.168.0.1:2181test-datacenter/test-server
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000
2、JAVA API實現
import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import kafka.consumer.*; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.apache.commons.collections.CollectionUtils; public class kafkaConsumer { public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException { Properties properties = new Properties(); properties.put("zookeeper.connect", "192.168.0.1:2181/test-datacenter/test-server"); properties.put("auto.commit.enable", "true"); properties.put("auto.commit.interval.ms", "60000"); properties.put("group.id", "test"); ConsumerConfig consumerConfig = new ConsumerConfig(properties); ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); //topic的過濾器 Whitelist whitelist = new Whitelist("test"); List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist); if (CollectionUtils.isEmpty(partitions)) { System.out.println("empty!"); TimeUnit.SECONDS.sleep(1); } //消費訊息 for (KafkaStream<byte[], byte[]> partition : partitions) { ConsumerIterator<byte[], byte[]> iterator = partition.iterator(); while (iterator.hasNext()) { MessageAndMetadata<byte[], byte[]> next = iterator.next(); System.out.println("partiton:" + next.partition()); System.out.println("offset:" + next.offset()); System.out.println("message:" + new String(next.message(), "utf-8")); } } } }
相關推薦
Kafka學習之consumer端部署及API
1、consumer.properties:檔案位於/resources目錄下 zookeeper.connect=192.168.0.1:2181test-datacenter/test-server # timeout in ms for connecting to
zabbix3.0之server端部署詳解
server 安裝 zabbix yum 配置 下載yum配置rpm -ivh http://mirrors.aliyun.com/zabbix/zabbix/3.0/rhel/7/x86_64/zabbix-release-3.0-1.el7.noarch.rpm 安裝依賴包[[email
Kafka 入門之集群部署遇到問題
學會 begin Kafka集群 zookeep 本地 代碼 解決方法 部署 研究 最近,因為上級主管部門需要通過使用Kafka向其傳輸文件,又因為此前沒有接觸過kafka,所以在部署測試kafka程序期間遇到很多問題,在這裏總結4個問題與1個建議,方便入門者參考也便於
Linux學習之路-集群及LVS(2)【25】---20180217
ipvs 靜態 動態算法 一、ipvs scheduleripvs scheduler:根據其調度時是否考慮各RS當前的負載狀態有兩種方法:靜態方法和動態方法1、靜態方法僅根據算法本身進行調度1、RR:roundrobin,輪詢2、WRR:Weighted RR,加權輪詢3、SH:Source H
菜鳥之路——機器學習之HierarchicalClustering層次分析及個人理解
features clu 機器 層次 節點類 均值 成了 range n) 這個算法。我個人感覺有點雞肋。最終的表達也不是特別清楚。 原理很簡單,從所有的樣本中選取Euclidean distance最近的兩個樣本,歸為一類,取其平均值組成一個新樣本,總樣本數少1;不斷的重
React學習之旅----Redux安裝及富文字、echarts
瀏覽器中安裝redux devtools擴充套件 yarn add redux react-redux redux-devtools-extension 安裝依賴包即可 // 引入createStore建立store,引入applyMiddleware 來使用中介軟體 //
我的kafka學習之路
初識kafka Kafka 從何而來?我們為什麼要開發 Kafka ? Kafka 到底是什麼? Kafka 最初是 Linkedln 的一個內部基礎設施系統。我們發現,雖然有很多資料庫和系統可以用來儲存資料,但在我們的架構裡,剛好缺一個可以幫助處理持續資料流的元件。在開發 Kafka 之前
Kafka學習之路 (一)Kafka的簡介
要求 異步通信 images 等等 ron 服務器角色 消費 消息 崩潰 一、簡介 1.1 概述 Kafka是最初由Linkedin公司開發,是一個分布式、分區的、多副本的、多訂閱者,基於zookeeper協調的分布式日誌系統(也可以當做MQ系統),常見可以用於web/ng
Kafka學習之路 (二)Kafka的架構
most 工具 路由 冪等 用戶 toc 目標 支持 mem 一、Kafka的集群架構 如上圖所示,一個典型的Kafka集群中包含若幹Producer(可以是web前端產生的Page View,或者是服務器日誌,系統CPU、Memory等),若幹broker(Kafka支持
React學習之旅----事件方法及改變this指向
import React, { Component } from 'react'; import '../assets/css/index.css' // react繫結屬性注意點 // class要換成className // for 要換成htmlFor // style class Home
Kafka學習之路 (五)Kafka在zookeeper中的存儲
序號 hadoop state 空閑 pre 離開 substr doc 退出 當kafka啟動的時候,就會向zookeeper裏面註冊一些信息,這些數據也稱為Kafka的元數據信息。 一、Kafka在zookeeper中存儲結構圖 二、分析 根目錄下的結構 服務端開啟的
Kafka學習之路 (三)Kafka的高可用
中一 不同 ive topic 停止工作 查找 同時 llb fail 一、高可用的由來 1.1 為何需要Replication 在Kafka在0.8以前的版本中,是沒有Replication的,一旦某一個Broker宕機,則其上所有的Partition數據都不可被消費,這
Kafka學習之路 (四)Kafka的安裝
server 表達 mage 配置 list 執行 ase cti releases zookeeper1:192.168.1.11 zookeeper2:192.168.1.12 zookeeper3:192.168.1.13 kafka1:192.168.1.14 k
Kafka學習之路 (五)Kafka在zookeeper中的儲存
目錄 正文 回到頂部 一、Kafka在zookeeper中儲存結構圖 回到頂部 二、分析 2.1 topic註冊資訊 /brokers/topics/[topic] : 儲存某個topic的partitions所有分配資訊 [zk: localh
機器學習之線性迴歸原理及sklearn實現
1、線性迴歸問題 以房價預測為例,佔地面積為變數x1,房屋年齡為變數x2,房屋價格為預測變數y。 為什麼叫線性迴歸問題,因為目標函式是一個線性迴歸函式。什麼是目標函式? (1)、目標函式:目標函式是我們需要的最終結果,及
ARKit學習之2.0基礎及案例解析(後續再更新)
為了方便AR開發交流,博主建立了一個群 :891555732,歡迎一起討論 一.多人共享資料 官方案例原始碼 : https://github.com/Unity-Technologies/SharedSpheres ①.獲取資料及儲
Web學習之跨域問題及解決方案
在做前端開發時,我們時常使用ajax與伺服器通訊獲取資源,享受ajax便利的同時,也知道它有限制:跨域安全限制,即同源策略。 同源策略(SOP),核心是確保不同源提供的檔案之間是相互獨立的 預設情況下,XHR物件只能訪問與包含它的頁面處於同一域中的資源,這種限制可以預防某些惡意攻擊,但同
Redis學習筆記-RedisCluster安裝部署和API使用
1、Ruby安裝 $tar -xzvf ruby-2.2.4.tar.gz $./configure $make $make install 2、Redis安裝 $tar -zxvf redis-3.2.3.tar.gz $make && m
Vue學習之旅----vue-cli及建立專案,初步執行
npm i vue-cli -g vue init webpack vue01 cd vue01 npm install npm run dev vue基礎 <li v-for="(index,item) in list " :key=index>{{item
typescript學習之旅----資料型別及函式、傳參、過載等
typescript中為了使編寫的程式碼更規範,更有利於維護,增加了型別校驗,在typescript中主要給我們提供了以下資料型別 布林型別(boolean) 數字型別(number) 字串型別(string)