1. 程式人生 > >Kafka使用Java進行Producer和Consumer程式設計

Kafka使用Java進行Producer和Consumer程式設計

比較舊的kafka_2.10-0.8.2.0版本:(參考自http://chengjianxiaoxue.iteye.com/blog/2190488)

生產者程式碼:

import java.util.Properties;  
import java.util.concurrent.TimeUnit;  
  
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;  

public class kafkaProducer extends Thread{  
  
    private String topic;  
      
    public kafkaProducer(String topic){  
        super();  
        this.topic = topic;  
    }  
      
    @Override
    public void run() {  
        Producer producer = createProducer();  
        int i=0;
        while(true){
            producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));  
            try {
                TimeUnit.SECONDS.sleep(1);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }
    }  
 
    private Producer createProducer() {  
        Properties properties = new Properties();  
        properties.put("zookeeper.connect", "192.168.205.153:2181");//宣告zk  
        properties.put("serializer.class", StringEncoder.class.getName());  
        properties.put("metadata.broker.list", "192.168.205.153:9092");// 宣告kafka broker  
        return new Producer<Integer, String>(new ProducerConfig(properties));  
     }  
      
      
    public static void main(String[] args) {  
        new kafkaProducer("test").start();// 使用kafka叢集中建立好的主題 test   
    }  

}
消費者程式碼:
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  

import kafka.consumer.Consumer;  
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;  

/** 
 * 接收資料 
接收到: message: 10 
接收到: message: 11 
接收到: message: 12 
接收到: message: 13 
接收到: message: 14 
 */  
public class kafkaConsumer extends Thread{  
  
    private String topic;  
      
    public kafkaConsumer(String topic){  
        super();  
        this.topic = topic;  
    }  
      
    @Override  
    public void run() {  
        ConsumerConnector consumer = createConsumer();  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(topic, 1); // 1表示consumer thread執行緒數量
         Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
         KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 獲取每次接收到的這個資料  
         ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
         while(iterator.hasNext()){  
             String message = new String(iterator.next().message());  
             System.out.println("接收到: " + message);  
         }  
    }  
  
    private ConsumerConnector createConsumer() {  
        Properties properties = new Properties();  
        properties.put("zookeeper.connect", "192.168.205.153:2181");//宣告zk  
        properties.put("group.id", "group1");// 必須要使用別的組名稱, 如果生產者和消費者都在同一組,則不能訪問同一組內的topic資料  
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
     }  
      
    public static void main(String[] args) {  
        new kafkaConsumer("test").start();// 使用kafka叢集中建立好的主題 test   
    }  
       
}
注意:以上程式碼在kafka_2.10-0.8.2.0和kafka_2.11-0.9.0.1中都能執行,因為高版本會相容低版本,所以高版本的程式碼不一定能在低版本里執行,而低版本的程式碼一般都能在高版本中執行。

kafka_2.11-0.9.0.1版本:

例子一:(程式碼參考自http://blog.csdn.net/lnho2015/article/details/51353936)

生產者程式碼:

import org.apache.kafka.clients.producer.KafkaProducer;  
import org.apache.kafka.clients.producer.Producer;  
import org.apache.kafka.clients.producer.ProducerRecord;  
  
import java.util.Properties;  
  
public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();  
        props.put("bootstrap.servers", "h153:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 100; i++)
//            producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
        	producer.send(new ProducerRecord<String, String>("test", "Hello"));
        	producer.close();
    }
}
消費者程式碼:
import org.apache.kafka.clients.consumer.ConsumerRecord;  
import org.apache.kafka.clients.consumer.ConsumerRecords;  
import org.apache.kafka.clients.consumer.KafkaConsumer;  
  
import java.util.Arrays;  
import java.util.Properties;  
  
public class KafkaConsumerExample {  
    public static void main(String[] args) {  
        Properties props = new Properties();  
        props.put("bootstrap.servers", "h153:9092");
        props.put("group.id", "test");  
        props.put("enable.auto.commit", "true");  
        props.put("auto.commit.interval.ms", "1000");  
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);  
            for (ConsumerRecord<String, String> record : records)  
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());  
        }
    }
}
注意1:一開始用myeclipse匯入kafka_2.10-0.8.2.0jar包正常,在匯入kafka_2.11-0.9.0.1後雖然報錯但能生產資料,在匯入kafka_2.10-0.10.1.0和kafka_2.11-0.11.0jar包就直接報錯了而不產生資料。在匯入kafka_2.11-0.9.0.1jar包報這個錯:
[2017-10-30 19:08:39,779] ERROR Uncaught error in kafka producer I/O thread: (org.apache.kafka.clients.producer.internals.Sender:143)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'throttle_time_ms': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:141)
at java.lang.Thread.run(Thread.java:724)

原因:我在myeclipse中匯入的是kafka_2.11-0.9.0.1的jar包,而在Linux虛擬機器中安裝的卻是kafka_2.10-0.8.2.0

注意2:當我在Linux本地上執行生產者程式碼卻報這個錯

[[email protected] q1]$ javac KafkaProducerExample.java 
KafkaProducerExample.java:1: error: package org.apache.kafka.clients.producer does not exist
import org.apache.kafka.clients.producer.KafkaProducer;    
                                        ^
KafkaProducerExample.java:2: error: package org.apache.kafka.clients.producer does not exist
import org.apache.kafka.clients.producer.Producer;    
                                        ^
KafkaProducerExample.java:3: error: package org.apache.kafka.clients.producer does not exist
import org.apache.kafka.clients.producer.ProducerRecord;    
                                        ^
KafkaProducerExample.java:19: error: cannot find symbol
        Producer<String, String> producer = new KafkaProducer<>(props);  
        ^
  symbol:   class Producer
  location: class KafkaProducerExample
KafkaProducerExample.java:19: error: cannot find symbol
        Producer<String, String> producer = new KafkaProducer<>(props);  
                                                ^
  symbol:   class KafkaProducer
  location: class KafkaProducerExample
KafkaProducerExample.java:21: error: cannot find symbol
            producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
                              ^
  symbol:   class ProducerRecord
  location: class KafkaProducerExample
6 errors
解決:我當時裝的Java版本是jdk1.8.0_91,換為jdk1.7.0_25,並且在~/.bash_profile中新增如Kafkajar包到環境變數中,如下:

export CLASSPATH=.:/home/hadoop/hbase-1.0.0/lib/*:/home/hadoop/kafka_2.11-0.9.0.1/libs/*

例子二:(參考自http://blog.csdn.net/zero__007/article/details/51068165)

生產者程式碼:

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;  
import org.apache.kafka.clients.producer.Producer;  
import org.apache.kafka.clients.producer.ProducerRecord;  
import org.apache.kafka.clients.producer.RecordMetadata;
  
import java.util.Properties;  

public class Produce {
    public static void main(String[] args) {
    	String topic = "test";
    	Properties props = new Properties();  
    	props.put("bootstrap.servers", "h153:9092");  
    	props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
    	props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
    	props.put("acks", "all");  
    	props.put("retries", 1);  
    	 
    	Producer<String, String> producer = new KafkaProducer<String, String>(props);  
    	producer.send(new ProducerRecord<String, String>(topic, "Hello"));
    	  
    	producer.send(new ProducerRecord<String, String>(topic, "World"), new Callback() {  
    	    @Override  
    	    public void onCompletion(RecordMetadata metadata, Exception e) {  
    	        if (e != null) {
    	            e.printStackTrace();
    	        } else {
    	            System.out.println(metadata.toString());//[email protected]  
    	            System.out.println(metadata.offset());//1  
    	        }
    	    }
    	});  
    	producer.flush();
    	producer.close();
    }
}
消費者程式碼:
import org.apache.kafka.clients.consumer.ConsumerRecord;  
import org.apache.kafka.clients.consumer.ConsumerRecords;  
import org.apache.kafka.clients.consumer.KafkaConsumer;  
import org.apache.kafka.common.TopicPartition;
  
import java.util.Arrays;  
import java.util.Properties;  
  
public class Consumer {  
    public static void main(String[] args) {  
    	String topic = "test";
    	Properties props = new Properties();  
    	props.put("bootstrap.servers", " h153:9092");  
    	props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
    	props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
    	props.setProperty("group.id", "0");  
    	props.setProperty("enable.auto.commit", "true");  
    	props.setProperty("auto.offset.reset", "earliest");  
    	  
    	KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);  
    	consumer.subscribe(Arrays.asList(topic));  
    	  
    	for (int i = 0; i < 2; i++) {  
    	    ConsumerRecords<String, String> records = consumer.poll(1000);  
    	    System.out.println(records.count());  
    	    for (ConsumerRecord<String, String> record : records) {  
    	        System.out.println(record);  
//    	        consumer.seekToBeginning(new TopicPartition(record.topic(), record.partition()));
    	    }  
    	}
    }
}
注意:auto.offset.reset這裡設定為earliest,是為了consumer能夠從頭開始讀取內容即offset=0開始,在org.apache.kafka.clients.consumer.ConsumerConfig中對其意義的描述如下:What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offset;latest: automatically reset the offset to the latest offset;none: throw exception to the consumer if no previous offset is found for the consumer's group;anything else: throw exception to the consumer。consumer.seekToBeginning也可以設定offset,但是跟原始碼發現,This function evaluates lazily, seeking to the final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.也就是說seekToBeginning()的設定要生效的話,必須在poll或則position方法呼叫後設置seekToBeginning()才行。

補充(可參考這篇文章:http://blog.csdn.net/lishuangzhe7047/article/details/74530417):

earliest 
當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 
latest 
當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料 
none 
topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常


相關推薦

Kafka使用Java進行ProducerConsumer程式設計

比較舊的kafka_2.10-0.8.2.0版本:(參考自http://chengjianxiaoxue.iteye.com/blog/2190488)生產者程式碼:import java.util.Properties; import java.util.concurr

Kafka學習整理七(producerconsumer程式設計實踐)

實踐程式碼採用kafka-clients V0.10.0.0 編寫 一、編寫producer 第一步:使用./kafka-topics.sh 命令建立topic及partitions 分割槽數 ./kafka-topics.sh --create-

Java連接ActiveMQ代碼示例(ProducerConsumer)

生產 exce org 默認 main isp 用戶 close trac import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFacto

kafka執行ProducerConsumer時出現Failed to load class org.slf4j.impl.StaticLoggerBinder錯誤

當執行這一步時: 報瞭如下的錯誤: 解決辦法: 通過ps -aux | grep kafka找到該程序的程序pid,然後通過 lsof -p pid //pid為上面你找到的該程序的

安裝部署(六) Kafka叢集安裝部署以及ProducerConsumer的JAVA程式碼測試

Kafka叢集安裝部署以及Producer和Consumer的JAVA程式碼測試 kafka scala2.11_0.10.0.0 ubuntu 14.04.04 x64 hadoop 2.7.2spark 2.0.0 scala 2.11.8 jdk 1.8.0_101

Kafka入門,producerconsumer與hive

{        "name":"hdfs-hive-sink-03",        "config":{               "connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",

RocketMQ中ProducerConsumer啟動出錯

//錯誤描述com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.237.13

kafka客戶端ProducerConsumer關於自定義訊息序列反序列

一、背景    最近在學習kafka相關的知識,正好遇到一個疑問,在寫demo的過程中發現,投遞的資料都是字串型別,那麼就想想在實際應用中應該會有大量的需求投遞自定義資料型別,那麼如何才能投遞自定義資料型別呢?這裡面就涉及到了kafka提供的介面序列化和反序列化的功能。二、k

使用Scratch2ROS進行機器人圖形化程式設計學習

使用Scratch2和ROS進行機器人程式設計學習(適用於中小學機器人程式設計Scratch和ROS)參考JdeRobot的一篇詳細介紹,就可以實現上述的功能,需要安裝Scratch2、ROS Kinetic、Gazebo 7、JdeRobot、Python2.7等。通過將S

使用 C++ MFC 進行多執行緒程式設計

程序是應用程式的執行例項。例如,雙擊“記事本”圖示時,將啟動執行“記事本”的程序。 執行緒是程序內的執行路徑。啟動“記事本”時,作業系統建立程序並開始執行該程序的主執行緒。此執行緒終止時,程序也終止。啟動程式碼以函式地址的形式將此主執行緒提供給作業系統。通常是所提供的main 函式或 WinMain 函式

Kafka系列3-python版本producer生產者consumer消費者例項

直接上程式碼了: # -*- coding: utf-8 -*- ''' 使用kafka-Python 1.3.3模組 ''' import sys import time import json from kafka import KafkaProduce

rabbit - producer的confirmconsumer的ack模式

本篇和大家分享的是關於rabbit的生產和消費方的一些實用的操作;正如文章標題,主要內容如producer的confirm和consumer的ack,這兩者使用的模式都是用來保證資料完整性,防止資料丟失。 producer的confirm模式 consumer的ack模式 producer的co

BOS項目 第7天(shiro權限框架進行認證授權)

ebs setattr not action 錯誤信息 add 流程圖 元素 錯誤提示 BOS項目筆記 第7天 今天內容安排: 1、權限概述(認證、授權) 2、常見的權限控制的方式(URL攔截權限控制、方法註解權限控制) 3、權限數據模型(權限表、角色表、用戶表、角色權

什麽是私有密鑰密碼技術——密鑰加密算法采用同一把密鑰進行加密解密

解密 網絡安全 位操作 線性復雜 對稱 大量 控制 全局 相位 什麽是私有密鑰密碼技術 私有密鑰(Symmetric Key),又叫對稱密鑰。密鑰加密算法采用同一把密鑰進行加密和解密。它的優點是加密和解密速度非常快,但密鑰的分發和管理比較困難。信息的發送者和接收者必須明確同

如何確定Kafka的分區數、keyconsumer線程數

為什麽 打包 lower 匹配 到來 har mit 技術分享 每一個 轉自:http://www.tuicool.com/articles/Aj6fAj3 如何確定Kafka的分區數、key和consumer線程數 在Kafak中國社區的qq群中,這個問題被提及的

C/C++使用openssl進行摘要加密解密(md5, sha256, des, rsa)

fin hex pau 字節 system org key rate 釋放 openssl裏面有很多用於摘要哈希、加密解密的算法,方便集成於工程項目,被廣泛應用於網絡報文中的安全傳輸和認證。下面以md5,sha256,des,rsa幾個典型的api簡單使用作為例子。

在windows 下使用eclipse進行編譯燒寫

desktop 交叉編譯 頁面 開源 操作 調試 process 內置 sta eclipse IDE是一款開源的前端編程軟件,它提供了編寫,編譯和調試ESP-IDF項目的圖形集成開發環境。 首先在https://www.obeo.fr/en/eclipse-downloa

蜜罐技術——通過布置一些作為誘餌的主機、網絡服務或者信息,誘使攻擊方對它們實施攻擊,從而可以對攻擊行為進行捕獲分析

技術 使用 alt name 防火墻 text 來源 情報 優點 蜜罐技術本質上是一種對攻擊方進行欺騙的技術,通過布置一些作為誘餌的主機、網絡服務或者信息,誘使攻擊方對它們實施攻擊,從而可以對攻擊行為進行捕獲和分析,了解攻擊方所使用的工具與方法,推測攻擊意圖和動機,能夠讓防

網絡安全熱門話題——如何對被(已經/正在)入侵網站進行檢測防範

網絡 安全 熱門 九月安全專題討論:網絡安全熱門話題——如何對被(已經/正在)入侵網站進行檢測和防範擬進行以下技術(可以自定義相關技術)討論和技術研究,歡迎大家參與:(1)網站入侵日誌文件分析(2)抓包分析入侵行為並修補程序漏洞(3)從規則進行安全防護(4)在線監測webshell等惡意行為(5)

Kafka 學習筆記之 Kafka0.11之console-producer/console-consumer

scribe tor 新的 producer 建立 actor sum consumer creat Kafka 學習筆記之 Kafka0.11之console-producer/console-consumer: 啟動Zookeeper 啟動Kafka0.11 創建一