kafka原始碼解析之十七消費者流程(客戶端如何獲取topic的資料)
Kafka消費資料的角色分為普通消費者和高階消費者,其介紹如下:
17.1 普通消費者
特點:1)一個訊息讀取多次
2)在一個處理過程中只消費某個broker上的partition的部分訊息
3)必須在程式中跟蹤offset值
4)必須找出指定TopicPartition中的lead broker
5)必須處理broker的變動
客戶端程式設計必須按照以下步驟:
1)從所有活躍的broker中找出哪個是指定TopicPartition中的leader broker
2)構造請求
3)傳送請求查詢資料
4)處理leader broker變更
客戶端程式碼如下:
public class KafkaSimpleConsumer { private List<String> m_replicaBrokers = new ArrayList<String>(); public KafkaSimpleConsumer() { m_replicaBrokers = new ArrayList<String>(); } public static void main(String args[]) { KafkaSimpleConsumer example = new KafkaSimpleConsumer(); // 最大讀取訊息數量 long maxReads = Long.parseLong("3"); // 要訂閱的topic String topic = "mytopic"; // 要查詢的分割槽 int partition = Integer.parseInt("0"); // broker節點的ip List<String> seeds = new ArrayList<String>(); seeds.add("192.168.4.30"); seeds.add("192.168.4.31"); seeds.add("192.168.4.32"); // 埠 int port = Integer.parseInt("9092"); try { example.run(maxReads, topic, partition, seeds, port); } catch (Exception e) { System.out.println("Oops:" + e); e.printStackTrace(); } } public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { // 獲取指定Topic partition的元資料 PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); if (metadata == null) { System.out.println("Can't find metadata for Topic and Partition. Exiting"); return; } if (metadata.leader() == null) { System.out.println("Can't find Leader for Topic and Partition. Exiting"); return; } //找到leader broker String leadBroker = metadata.leader().host(); String clientName = "Client_" + a_topic + "_" + a_partition; //連結leader broker SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); //獲取topic的最新偏移量 long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName); int numErrors = 0; while (a_maxReads > 0) { if (consumer == null) { consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); } //本質上就是傳送FetchRequest請求 FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build(); FetchResponse fetchResponse = consumer.fetch(req); if (fetchResponse.hasError()) { numErrors++; // Something went wrong! short code = fetchResponse.errorCode(a_topic, a_partition); System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); if (numErrors > 5) break; if (code == ErrorMapping.OffsetOutOfRangeCode()) { // We asked for an invalid offset. For simple case ask for // the last element to reset readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName); continue; } consumer.close(); consumer = null; //處理topic的partition的leader發生變更的情況 leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); continue; } numErrors = 0; long numRead = 0; for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) {//過濾舊的資料 System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset); continue; } readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); //列印訊息 System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8")); numRead++; a_maxReads--; } if (numRead == 0) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } if (consumer != null) consumer.close(); } public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } /** * @param a_oldLeader * @param a_topic * @param a_partition * @param a_port * @return String * @throws Exception *找一個leader broker,其實就是傳送TopicMetadataRequest請求 */ private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { for (int i = 0; i < 3; i++) { boolean goToSleep = false; PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); if (metadata == null) { goToSleep = true; } else if (metadata.leader() == null) { goToSleep = true; } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { // first time through if the leader hasn't changed give // ZooKeeper a second to recover // second time, assume the broker did recover before failover, // or it was a non-Broker issue // goToSleep = true; } else { return metadata.leader().host(); } if (goToSleep) { try { Thread.sleep(1000); } catch (InterruptedException ie) { } } } System.out.println("Unable to find new leader after Broker failure. Exiting"); throw new Exception("Unable to find new leader after Broker failure. Exiting"); } private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { PartitionMetadata returnMetaData = null; loop: for (String seed : a_seedBrokers) { SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); List<String> topics = Collections.singletonList(a_topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { if (part.partitionId() == a_partition) { returnMetaData = part; break loop; } } } } catch (Exception e) { System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } if (returnMetaData != null) { m_replicaBrokers.clear(); for (kafka.cluster.Broker replica : returnMetaData.replicas()) { m_replicaBrokers.add(replica.host()); } } return returnMetaData; } }
17.2 高階消費者
特點:
1)消費過的資料無法再次消費,如果想要再次消費資料,要麼換另一個group
2)為了記錄每次消費的位置,必須提交TopicAndPartition的offset,offset提交支援兩種方式:
①提交至ZK (頻繁操作zk是效率比較低的)
②提交至kafka內部
3)客戶端通過stream獲取資料,stream即指的是來自一個或多個伺服器上的一個或者多個partition的訊息。每一個stream都對應一個單執行緒處理。因此,client能夠設定滿足自己需求的stream數目。總之,一個stream也許代表了多個伺服器partion的訊息的聚合,但是每一個partition都只能到一個stream。
4)consumer和partition的關係:
①如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許併發的,所以consumer數不要大於partition數
②如果consumer比partition少,一個consumer會對應於多個partitions,這裡主要合理分配consumer數和partition數,否則會導致partition裡面的資料被取的不均勻
③如果consumer從多個partition讀到資料,不保證資料間的順序性,kafka只保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同
客戶端程式設計必須按照以下步驟:
1)設計topic和stream的關係,即K為topic,V為stream的個數N
2)開啟N個消費組執行緒消費這N個stream
客戶端程式碼如下:import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.consumer.ConsumerIterator;
/**
* 詳細可以參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
*
* @author Fung
*/
public class KafkaHighConsumer {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public KafkaHighConsumer(String a_zookeeper, String a_groupId, String a_topic) {
consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
}
public void run(int numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//設計topic和stream的關係,即K為topic,V為stream的個數N
topicCountMap.put(topic, new Integer(numThreads));
//獲取numThreads個stream
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(numThreads);
int threadNumber = 0;
//開啟N個消費組執行緒消費這N個stream
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerMsgTask(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper,
String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] arg) {
String[] args = {"172.168.63.221:2188", "group-1", "page_visits", "12"};
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
KafkaHighConsumer demo = new KafkaHighConsumer(zooKeeper, groupId, topic);
demo.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
demo.shutdown();
}
public class ConsumerMsgTask implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
m_threadNumber = threadNumber;
m_stream = stream;
}
public void run() {// KafkaStream的本質就是一個網路迭代器
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": "
+ new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
/**
* Created by Administrator on 2016/4/11.
*/
public static class KafkaProducer {
}
}
其具體的消費邏輯如下: