1. 程式人生 > >kafka原始碼解析之十七消費者流程(客戶端如何獲取topic的資料)

kafka原始碼解析之十七消費者流程(客戶端如何獲取topic的資料)

Kafka消費資料的角色分為普通消費者和高階消費者,其介紹如下:

17.1 普通消費者

特點:1)一個訊息讀取多次

   2)在一個處理過程中只消費某個broker上的partition的部分訊息

   3)必須在程式中跟蹤offset值

   4)必須找出指定TopicPartition中的lead broker

   5)必須處理broker的變動

客戶端程式設計必須按照以下步驟:

   1)從所有活躍的broker中找出哪個是指定TopicPartition中的leader broker

   2)構造請求

   3)傳送請求查詢資料

   4)處理leader broker變更

客戶端程式碼如下:

public class KafkaSimpleConsumer {
    private List<String> m_replicaBrokers = new ArrayList<String>();
    public KafkaSimpleConsumer() {
        m_replicaBrokers = new ArrayList<String>();
    }
    public static void main(String args[]) {
        KafkaSimpleConsumer example = new KafkaSimpleConsumer();
        // 最大讀取訊息數量
        long maxReads = Long.parseLong("3");
        // 要訂閱的topic
        String topic = "mytopic";
        // 要查詢的分割槽
        int partition = Integer.parseInt("0");
        // broker節點的ip
        List<String> seeds = new ArrayList<String>();
        seeds.add("192.168.4.30");
        seeds.add("192.168.4.31");
        seeds.add("192.168.4.32");
        // 埠
        int port = Integer.parseInt("9092");
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }
    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // 獲取指定Topic partition的元資料
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        //找到leader broker
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;
//連結leader broker
        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
//獲取topic的最新偏移量
        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
//本質上就是傳送FetchRequest請求
            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(req);
            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5)
                    break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for
                    // the last element to reset
                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                //處理topic的partition的leader發生變更的情況
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;
            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {//過濾舊的資料
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();
                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
//列印訊息
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }
            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null)
            consumer.close();
    }
    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }
    /**
     * @param a_oldLeader
     * @param a_topic
     * @param a_partition
     * @param a_port
     * @return String
     * @throws Exception
     *找一個leader broker,其實就是傳送TopicMetadataRequest請求
     */
    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give
                // ZooKeeper a second to recover
                // second time, assume the broker did recover before failover,
                // or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }
    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop: for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null)
                    consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}

17.2 高階消費者

特點:

1)消費過的資料無法再次消費,如果想要再次消費資料,要麼換另一個group

2)為了記錄每次消費的位置,必須提交TopicAndPartition的offset,offset提交支援兩種方式:

①提交至ZK (頻繁操作zk是效率比較低的)

②提交至kafka內部

3)客戶端通過stream獲取資料,stream即指的是來自一個或多個伺服器上的一個或者多個partition的訊息。每一個stream都對應一個單執行緒處理。因此,client能夠設定滿足自己需求的stream數目。總之,一個stream也許代表了多個伺服器partion的訊息的聚合,但是每一個partition都只能到一個stream。

4)consumer和partition的關係:

       ①如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許併發的,所以consumer數不要大於partition數

       ②如果consumer比partition少,一個consumer會對應於多個partitions,這裡主要合理分配consumer數和partition數,否則會導致partition裡面的資料被取的不均勻

       ③如果consumer從多個partition讀到資料,不保證資料間的順序性,kafka只保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同

客戶端程式設計必須按照以下步驟:

1)設計topic和stream的關係,即K為topic,V為stream的個數N

2)開啟N個消費組執行緒消費這N個stream

客戶端程式碼如下:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.consumer.ConsumerIterator;
/**
 * 詳細可以參考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 *
 * @author Fung
 */
public class KafkaHighConsumer {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
    public KafkaHighConsumer(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
    public void shutdown() {
        if (consumer != null)
            consumer.shutdown();
        if (executor != null)
            executor.shutdown();
    }
    public void run(int numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//設計topic和stream的關係,即K為topic,V為stream的個數N
        topicCountMap.put(topic, new Integer(numThreads));
//獲取numThreads個stream
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
                .createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        executor = Executors.newFixedThreadPool(numThreads);
        int threadNumber = 0;
//開啟N個消費組執行緒消費這N個stream
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerMsgTask(stream, threadNumber));
            threadNumber++;
        }
    }
    private static ConsumerConfig createConsumerConfig(String a_zookeeper,
                                                       String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }
    public static void main(String[] arg) {
        String[] args = {"172.168.63.221:2188", "group-1", "page_visits", "12"};
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
        KafkaHighConsumer demo = new KafkaHighConsumer(zooKeeper, groupId, topic);
        demo.run(threads);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
        }
        demo.shutdown();
    }
    public class ConsumerMsgTask implements Runnable {
        private KafkaStream m_stream;
        private int m_threadNumber;
        public ConsumerMsgTask(KafkaStream stream, int threadNumber) {
            m_threadNumber = threadNumber;
            m_stream = stream;
        }
        public void run() {// KafkaStream的本質就是一個網路迭代器
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
            while (it.hasNext())
                System.out.println("Thread " + m_threadNumber + ": "
                        + new String(it.next().message()));
            System.out.println("Shutting down Thread: " + m_threadNumber);
        }
    }

    /**
     * Created by Administrator on 2016/4/11.
     */
    public static class KafkaProducer {
    }
}

其具體的消費邏輯如下: