1. 程式人生 > >Kafka學習之consumer端部署及API

Kafka學習之consumer端部署及API

1、consumer.properties:檔案位於/resources目錄下

zookeeper.connect=192.168.0.1:2181test-datacenter/test-server
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000

2、JAVA API實現
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
 
import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
 
import org.apache.commons.collections.CollectionUtils;
 

public class kafkaConsumer {
 
  public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
 
    Properties properties = new Properties();
    properties.put("zookeeper.connect", "192.168.0.1:2181/test-datacenter/test-server");
    properties.put("auto.commit.enable", "true");
    properties.put("auto.commit.interval.ms", "60000");
    properties.put("group.id", "test");
 
    ConsumerConfig consumerConfig = new ConsumerConfig(properties);
 
    ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
 
    //topic的過濾器
    Whitelist whitelist = new Whitelist("test");
    List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);
 
    if (CollectionUtils.isEmpty(partitions)) {
      System.out.println("empty!");
      TimeUnit.SECONDS.sleep(1);
    }
 
    //消費訊息
    for (KafkaStream<byte[], byte[]> partition : partitions) {
 
      ConsumerIterator<byte[], byte[]> iterator = partition.iterator();
      while (iterator.hasNext()) {
        MessageAndMetadata<byte[], byte[]> next = iterator.next();
        System.out.println("partiton:" + next.partition());
        System.out.println("offset:" + next.offset());
        System.out.println("message:" + new String(next.message(), "utf-8"));
      }
    } 
  }
}


相關推薦

Kafka學習consumer部署API

1、consumer.properties:檔案位於/resources目錄下 zookeeper.connect=192.168.0.1:2181test-datacenter/test-server # timeout in ms for connecting to

zabbix3.0server部署詳解

server 安裝 zabbix yum 配置 下載yum配置rpm -ivh http://mirrors.aliyun.com/zabbix/zabbix/3.0/rhel/7/x86_64/zabbix-release-3.0-1.el7.noarch.rpm 安裝依賴包[[email

Kafka 入門集群部署遇到問題

學會 begin Kafka集群 zookeep 本地 代碼 解決方法 部署 研究   最近,因為上級主管部門需要通過使用Kafka向其傳輸文件,又因為此前沒有接觸過kafka,所以在部署測試kafka程序期間遇到很多問題,在這裏總結4個問題與1個建議,方便入門者參考也便於

Linux學習路-集群LVS(2)【25】---20180217

ipvs 靜態 動態算法 一、ipvs scheduleripvs scheduler:根據其調度時是否考慮各RS當前的負載狀態有兩種方法:靜態方法和動態方法1、靜態方法僅根據算法本身進行調度1、RR:roundrobin,輪詢2、WRR:Weighted RR,加權輪詢3、SH:Source H

菜鳥路——機器學習HierarchicalClustering層次分析個人理解

features clu 機器 層次 節點類 均值 成了 range n) 這個算法。我個人感覺有點雞肋。最終的表達也不是特別清楚。 原理很簡單,從所有的樣本中選取Euclidean distance最近的兩個樣本,歸為一類,取其平均值組成一個新樣本,總樣本數少1;不斷的重

React學習旅----Redux安裝富文字、echarts

瀏覽器中安裝redux devtools擴充套件 yarn add redux  react-redux redux-devtools-extension 安裝依賴包即可 // 引入createStore建立store,引入applyMiddleware 來使用中介軟體 //

我的kafka學習

初識kafka Kafka 從何而來?我們為什麼要開發 Kafka ? Kafka 到底是什麼? Kafka 最初是 Linkedln 的一個內部基礎設施系統。我們發現,雖然有很多資料庫和系統可以用來儲存資料,但在我們的架構裡,剛好缺一個可以幫助處理持續資料流的元件。在開發 Kafka 之前

Kafka學習路 (一)Kafka的簡介

要求 異步通信 images 等等 ron 服務器角色 消費 消息 崩潰 一、簡介 1.1 概述 Kafka是最初由Linkedin公司開發,是一個分布式、分區的、多副本的、多訂閱者,基於zookeeper協調的分布式日誌系統(也可以當做MQ系統),常見可以用於web/ng

Kafka學習路 (二)Kafka的架構

most 工具 路由 冪等 用戶 toc 目標 支持 mem 一、Kafka的集群架構 如上圖所示,一個典型的Kafka集群中包含若幹Producer(可以是web前端產生的Page View,或者是服務器日誌,系統CPU、Memory等),若幹broker(Kafka支持

React學習旅----事件方法改變this指向

import React, { Component } from 'react'; import '../assets/css/index.css' // react繫結屬性注意點 // class要換成className // for 要換成htmlFor // style class Home

Kafka學習路 (五)Kafka在zookeeper中的存儲

序號 hadoop state 空閑 pre 離開 substr doc 退出 當kafka啟動的時候,就會向zookeeper裏面註冊一些信息,這些數據也稱為Kafka的元數據信息。 一、Kafka在zookeeper中存儲結構圖 二、分析 根目錄下的結構 服務端開啟的

Kafka學習路 (三)Kafka的高可用

中一 不同 ive topic 停止工作 查找 同時 llb fail 一、高可用的由來 1.1 為何需要Replication 在Kafka在0.8以前的版本中,是沒有Replication的,一旦某一個Broker宕機,則其上所有的Partition數據都不可被消費,這

Kafka學習路 (四)Kafka的安裝

server 表達 mage 配置 list 執行 ase cti releases zookeeper1:192.168.1.11 zookeeper2:192.168.1.12 zookeeper3:192.168.1.13 kafka1:192.168.1.14 k

Kafka學習路 (五)Kafka在zookeeper中的儲存

目錄 正文 回到頂部 一、Kafka在zookeeper中儲存結構圖 回到頂部 二、分析 2.1 topic註冊資訊 /brokers/topics/[topic] : 儲存某個topic的partitions所有分配資訊 [zk: localh

機器學習線性迴歸原理sklearn實現

  1、線性迴歸問題        以房價預測為例,佔地面積為變數x1,房屋年齡為變數x2,房屋價格為預測變數y。        為什麼叫線性迴歸問題,因為目標函式是一個線性迴歸函式。什麼是目標函式?        (1)、目標函式:目標函式是我們需要的最終結果,及

ARKit學習2.0基礎案例解析(後續再更新)

 為了方便AR開發交流,博主建立了一個群 :891555732,歡迎一起討論 一.多人共享資料  官方案例原始碼 : https://github.com/Unity-Technologies/SharedSpheres ①.獲取資料及儲

Web學習跨域問題解決方案

在做前端開發時,我們時常使用ajax與伺服器通訊獲取資源,享受ajax便利的同時,也知道它有限制:跨域安全限制,即同源策略。 同源策略(SOP),核心是確保不同源提供的檔案之間是相互獨立的 預設情況下,XHR物件只能訪問與包含它的頁面處於同一域中的資源,這種限制可以預防某些惡意攻擊,但同

Redis學習筆記-RedisCluster安裝部署API使用

1、Ruby安裝 $tar -xzvf ruby-2.2.4.tar.gz $./configure $make $make install 2、Redis安裝 $tar -zxvf redis-3.2.3.tar.gz $make && m

Vue學習旅----vue-cli建立專案,初步執行

npm i vue-cli -g vue init webpack vue01 cd vue01 npm install npm run dev vue基礎 <li v-for="(index,item) in list " :key=index>{{item

typescript學習旅----資料型別函式、傳參、過載等

typescript中為了使編寫的程式碼更規範,更有利於維護,增加了型別校驗,在typescript中主要給我們提供了以下資料型別         布林型別(boolean)         數字型別(number)         字串型別(string)