Kafka筆記三之java操作
maven依賴,我使用的是版本是0.8.22,scala是2.11
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
</dependency>
1.本地測試關閉防火器
2.在windows中配置kafka,zookeeper的host
C:\Windows\System32\drivers\etc\hosts檔案
生產者
需要將producer.properties檔案中的serializer.class引數由DefaultEncoder改為StringEncoder,不然會報錯誤
serializer.class=kafka.serializer.DefaultEncoder
serializer.class=kafka.serializer.StringEncoder
metadata.broker.list=shb01:9092,129.168.79.139:9092
package kafka; import java.util.ArrayList; import java.util.List; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 生產者例項 * @author think * */ public class ProducerTest { public static void main(String[] args) throws Exception { /** * 屬性引數,設定 * 方式1:config.setProperty("producer.type", "sync"); 可以手動設定生產者引數 * 方式2:直接載入kafka叢集中的producer.properties * 在producer.properties中指定broker List * metadata.broker.list=shb01:9092,129.168.79.139:9092 */ Properties config = new Properties(); config.load(ProducerTest.class.getClassLoader().getResourceAsStream("producer.properties")); ProducerConfig pConfig = new ProducerConfig(config); /** * 建立生產者物件 * key:為 message的key * value:為message的value */ Producer<String, String> produce = new Producer<String, String>(pConfig); /** * 將使用者引數組裝成message, * KeyedMessage<String, String>:key用來分割槽,value是訊息本身;key可以不寫 */ String topic = "hello"; List<KeyedMessage<String, String>> messageList = new ArrayList<KeyedMessage<String, String>>(); KeyedMessage<String, String> message1 = new KeyedMessage<String, String>(topic, "key1", "value1"); KeyedMessage<String, String> message2 = new KeyedMessage<String, String>(topic, "key2", "value2"); messageList.add(message1); messageList.add(message2); KeyedMessage<String, String> message3 = new KeyedMessage<String, String>(topic, "value3"); /** * 生產者生產訊息 */ produce.send(messageList); produce.send(message3); //關閉 produce.close(); } }
消費者
消費者是以連結的形式存在,監聽生產者生產的訊息
zookeeper.connect=shb01:2181,129.168.79.139:2181
group.id=consumer-group-0
package kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; public class ConsumerTest { public static void main(String[] args) throws Exception{ String topic = "hello"; /** * 屬性引數 */ Properties prop = new Properties(); prop.load(Consumer.class.getClassLoader().getResourceAsStream("consumer.properties")); ConsumerConfig conf = new ConsumerConfig(prop); //建立消費者 ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(conf); //執行消費 //topicCountMap:key是topic名稱,value是該topic所使用的消費者的個數 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, 2);//使用2個消費者消費topic-hello /** * messageStreams: * key是消費的topic的名稱。 * value是消費者讀取 kafka流 */ Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = createJavaConsumerConnector.createMessageStreams(topicCountMap); //list就是消費者讀取的kafka流,有幾個消費者就有幾個流,此處kafka流的數量為2 List<KafkaStream<byte[], byte[]>> list = messageStreams.get(topic); //啟動多執行緒來分別讀取 //KafkaStream<byte[], byte[]>,key是所消費的訊息的key,value是所消費的訊息value for(KafkaStream<byte[], byte[]> kafkaStream : list) { new Thread(new Worker(kafkaStream)).start();; } } /** * 內部類,用執行緒接收kafka流 * @author think * */ static class Worker implements Runnable { private KafkaStream<byte[], byte[]> kafkaStream; public Worker(KafkaStream<byte[], byte[]> kafkaStream) { this.kafkaStream = kafkaStream; } @Override public void run() { //使用kafkaStream的迭代器,迭代消費資料 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while(iterator.hasNext()) { //MessageAndMetadata就是訊息本身 MessageAndMetadata<byte[], byte[]> next = iterator.next(); System.out.println(String.format("key:%s value:%s partition:%s offset:%s", next.key() == null? "": new String(next.key()), new String(next.message()), next.partition(), next.offset())); } } } }
分割槽器
producer.properties檔案
partitioner.class=kafka.ProducerTest
package kafka;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
/**
* 分割槽器
* @author think
* 必須在producer.properties中指定partitioner.class=kafka.ProducerTest
*/
public class PartitionTest implements Partitioner{
/**
* 系統會將生產者的生產屬性傳遞過來供開發者使用,就是producer.properties中的引數值
* prop.getProperty("metadata.broker.list");
*/
private kafka.utils.VerifiableProperties prop;
/**
* 必須提供一個構造方法,引數為VerifiableProperties
* @param prop
*/
public PartitionTest(VerifiableProperties prop) {
super();
this.prop = prop;
}
/**
* numberPartition:表示partition的數量
*/
@Override
public int partition(Object key, int numberPartition) {
prop.getProperty("metadata.broker.list");
//生產者的key是key1,key2
String keyStr = (String) key;
if(keyStr.contains("1"))
{
return 1;//返回1分割槽,
}
else
{
if(keyStr.contains("2"))
{
return 2;//返回2分割槽
}
}
return 0;
}
}
partition在消費者中的分配
分割槽會進行排序,消費者會按照其id的字典順序排序,然後消費者執行緒數了會除以partition數量,以此來確定每個消費者可以消費的分割槽數量,如果不能整除則會給第一個消費者多分配一個分割槽
分割槽:0 1 2 3 4 5 6 7 8 9
消費者:c1 c2 c3
分配結果:c1 – 0 1 2 3; c2 – 4 5 6;c3 – 7 8 9
消費者會按照分割槽中訊息的生產順序來消費訊息,並且是消費完成一個分割槽後才會去消費另一個分割槽,所以一個分割槽中訊息的消費是有順序的而多個分割槽之間則是無序的。
Kafka會將一些重要的資訊儲存在zookeeper中比如broker,consumer,offset等,消費者消費時需要配合zk才可以讀取這些資訊,但是kafka並不是實時向zookeeper同步資訊(預設一分鐘),可以在consumer.properties中進行配置auto.commit.enable=true和auto.commit.interval.ms=1000毫秒,通過這兩個引數來控制kafka向zk同步的頻率。
Zookeeper支援多個kafka叢集,在server.properties中修改引數指定多個kafka叢集的目錄zookeeper.connect=shb01:2181/demo1,192.168.79.139:2181/demo1,後面在操作topic是注意對應的zk目錄。