1. 程式人生 > >Kafka java api-消費者程式碼與消費分析、生產者消費者配置檔案詳解

Kafka java api-消費者程式碼與消費分析、生產者消費者配置檔案詳解

1、消費者程式碼
用到消費者,所以也必須先把前面寫過的生產者程式碼也貼一下吧
生產者程式碼與自定義partition
使用maven導包

<dependencies>
      <dependency>
           <groupId>com.alibaba.jstorm</groupId>
           <artifactId>jstorm-core</artifactId>
           <version>2.1.1</version>
           <exclusions
>
<exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId
>
slf4j-jdk14</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.2</artifactId> <version>0.8.1</version
>
<exclusions> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
/**
 * 這是一個簡單的Kafka producer程式碼
 * 包含兩個功能:
 * 1、資料傳送
 * 2、資料按照自定義的partition策略進行傳送
 *
 *
 * KafkaSpout的類
 */
public class KafkaProducerSimple {
    public static void main(String[] args) {
        /**
         * 1、指定當前kafka producer生產的資料的目的地
         *  建立topic可以輸入以下命令,在kafka叢集的任一節點進行建立。
         *  bin/kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic test
         */

        String TOPIC = "orderMq";
        /**
         * 2、讀取配置檔案
         */
        Properties props = new Properties();
        /*
         * key.serializer.class預設為serializer.class  key的序列化使用哪個類
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /*
         * kafka broker對應的主機,格式為host1:port1,host2:port2
         */
        props.put("metadata.broker.list", "mini1:9092,mini2:9092,mini3:9092");
        /*
         * request.required.acks,設定傳送資料是否需要服務端的反饋,有三個值0,1,-1
         * 0,意味著producer永遠不會等待一個來自broker的ack,這就是0.7版本的行為。
         * 這個選項提供了最低的延遲,但是持久化的保證是最弱的,當server掛掉的時候會丟失一些資料。
         * 1,意味著在leader replica已經接收到資料後,producer會得到一個ack。
         * 這個選項提供了更好的永續性,因為在server確認請求成功處理後,client才會返回。
         * 如果剛寫到leader上,還沒來得及複製leader就掛了,那麼訊息才可能會丟失。
         * -1,意味著在所有的ISR都接收到資料後,producer才得到一個ack。
         * 這個選項提供了最好的永續性,只要還有一個replica存活,那麼資料就不會丟失
         */
        props.put("request.required.acks", "1");
        /*
         * 可選配置,如果不配置,則使用預設的partitioner partitioner.class
         * 預設值:kafka.producer.DefaultPartitioner
         * 用來把訊息分到各個partition中,預設行為是對key進行hash。
         */
        props.put("partitioner.class", "com.scu.kafka.MyLogPartitioner");
//        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
        /**
         * 3、通過配置檔案,建立生產者
         */
        Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
        /**
         * 4、通過for迴圈生產資料
         */
        for (int messageNo = 1; messageNo < 100000; messageNo++) {
            /**
             * 5、呼叫producer的send方法傳送資料
             * 注意:這裡需要指定 partitionKey,用來配合自定義的MyLogPartitioner進行資料分發
             */
            producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast"));
        }
    }
}
public class MyLogPartitioner implements Partitioner {
    private static Logger logger = Logger.getLogger(MyLogPartitioner.class);

    public MyLogPartitioner(VerifiableProperties props) {
    }

    /**
     *
     * @param obj 傳來的key 用它來進行hash分到partition
     * @param numPartitions 幾個partition 如果叢集中已存在該topic,那麼partition數為原本存在數,否則預設是2
     * @return 生產到哪個partition
     */
    public int partition(Object obj, int numPartitions) {
        return Integer.parseInt(obj.toString())%numPartitions;
    }

}

注:orderMq這個topic很早就通過命令列建立好了,指定了partition是3個。
下面是消費者程式碼

public class KafkaConsumerSimple implements Runnable {
    public String title;
    public KafkaStream<byte[], byte[]> stream;
    public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
        this.title = title;
        this.stream = stream;
    }
    public void run() {
        System.out.println("開始執行 " + title);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        /**
         * 不停地從stream讀取新到來的訊息,在等待新的訊息時,hasNext()會阻塞
         * 如果呼叫 `ConsumerConnector#shutdown`,那麼`hasNext`會返回false
         * */
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> data = it.next();
            String topic = data.topic();
            int partition = data.partition();
            long offset = data.offset();
            String msg = new String(data.message());
            System.out.println(String.format(
                    "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
                    title, topic, partition, offset, msg));
        }
        System.out.println(String.format("Consumer: [%s] exiting ...", title));
    }

    public static void main(String[] args) throws Exception{
        Properties props = new Properties();
        props.put("group.id", "dashujujiagoushi");//消費組組組名,任意取
        props.put("zookeeper.connect", "mini1:2181,mini2:2181,mini3:2181");//zookeeper連線
        props.put("auto.offset.reset", "largest");//最新位置開始消費
        props.put("auto.commit.interval.ms", "1000");
        props.put("partition.assignment.strategy", "roundrobin");//分割槽分配策略
        ConsumerConfig config = new ConsumerConfig(props);
        String topic1 = "orderMq";
        String topic2 = "paymentMq";
        //只要ConsumerConnector還在的話,consumer會一直等待新訊息,不會自己退出
        ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
        //定義一個map
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic1, 3);
        //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是對應的流
        Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
        //取出 `kafkaTest` 對應的 streams
        List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);
        //建立一個容量為4的執行緒池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        //建立20個consumer threads
        for (int i = 0; i < streams.size(); i++)
            executor.execute(new KafkaConsumerSimple("消費者" + (i + 1), streams.get(i)));
    }
}

測試:
先執行消費者程式,儘管partition目錄裡面的segment檔案是有以前生成的資料,但是不會打印出來而是一直提示(已經標記為消費狀態的就不再消費了,預設情況就是這樣,可以自己設定從0開始消費)

15:10:38.228 [main-SendThread(mini1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x15fdedc70380022 after 1ms
15:10:40.230 [main-SendThread(mini1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x15fdedc70380022 after 4ms

需要進行生產,所以再執行生產者程式,控制檯列印如下:

...
Consumer: [消費者1],  Topic: [orderMq],  PartitionId: [1], Offset: [17857], msg: [appidc977abb2-f0bc-41da-9daa-6b080321947bitcast]
Consumer: [消費者2],  Topic: [orderMq],  PartitionId: [0], Offset: [17724], msg: [appid9101368e-ac81-4bbf-b2b5-8f2facd41f54itcast]
Consumer: [消費者1],  Topic: [orderMq],  PartitionId: [1], Offset: [17858], msg: [appidb145da08-bb61-42e7-b140-9fed576c2faeitcast]
Consumer: [消費者1],  Topic: [orderMq],  PartitionId: [1], Offset: [17859], msg: [appid909a90ae-c0fb-42ac-97de-6d7438895e07itcast]
Consumer: [消費者3],  Topic: [orderMq],  PartitionId: [2], Offset: [17713], msg: [appid157754b5-6958-4286-9c25-ff67ccc61a42itcast]
Consumer: [消費者3],  Topic: [orderMq],  PartitionId: [2], Offset: [17714], msg: [appidb93b9355-4713-4e22-823a-756b4fe75bdfitcast]
Consumer: [消費者3],  Topic: [orderMq],  PartitionId: [2], Offset: [17715], msg: [appidf82ca658-528a-4f40-a023-8a155c15eaa1itcast]
...

精簡下如下

Consumer: [消費者1],  Topic: [orderMq],  PartitionId: [1], Offset: [17857], msg: [appidc977abb2-f0bc-41da-9daa-6b080321947bitcast]
Consumer: [消費者2],  Topic: [orderMq],  PartitionId: [0], Offset: [17724], msg: [appid9101368e-ac81-4bbf-b2b5-8f2facd41f54itcast]
Consumer: [消費者3],  Topic: [orderMq],  PartitionId: [2], Offset: [17713], msg: [appid157754b5-6958-4286-9c25-ff67ccc61a42itcast]

能看到三個消費者對應消費的partition。
那麼考慮以下問題
在建立orderMq的時候指定partition是3個,那麼如果此時我指定建立5個KafkaStream,那麼會怎麼消費呢?
消費者程式碼修改兩次如下

topicCountMap.put(topic1, 5);
ExecutorService executor = Executors.newFixedThreadPool(5);

再次同上一樣執行,輸出結果能看到只有3個消費者,所以指定KafkaStream比partition多是沒用的,只會有對應數量的消費者去消費對應的partition上的資料。

Consumer: [消費者2],  Topic: [orderMq],  PartitionId: [2], Offset: [26420], msg: [appid4b778b51-33c7-42de-83c2-5b85f8f2428aitcast]
Consumer: [消費者3],  Topic: [orderMq],  PartitionId: [0], Offset: [26423], msg: [appid86045c25-7b3f-4c82-ad2a-3e8e11958b28itcast]
Consumer: [消費者4],  Topic: [orderMq],  PartitionId: [1], Offset: [26562], msg: [appid213b5a91-a7bf-4a39-b585-456d95748566itcast]

如果指定的KafkaStream只有2呢?不做測試了,結果是其中一個消費者會消費2個partition,另外一個消費1個partition中的資料。

生產者,消費者配置檔案解釋
用java api不管是寫生產者程式碼還是消費者程式碼都使用配置檔案,那麼下面列出了生產者和消費者配置檔案介紹

生產者配置檔案解釋

#指定kafka節點列表,用於獲取metadata,不必全部指定
metadata.broker.list=kafka01:9092,kafka02:9092

# 指定分割槽處理類。預設kafka.producer.DefaultPartitioner,表通過key雜湊到對應分割槽
#partitioner.class=kafka.producer.DefaultPartitioner

# 是否壓縮,預設0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮後訊息中會有頭來指明訊息壓縮型別,故在消費者端訊息解壓是透明的無需指定。
compression.codec=none

# 指定序列化處理類
serializer.class=kafka.serializer.DefaultEncoder

# 如果要壓縮訊息,這裡指定哪些topic要壓縮訊息,預設empty,表示不壓縮。
#compressed.topics=

# 設定傳送資料是否需要服務端的反饋,有三個值0,1,-1
# 0: producer不會等待broker傳送ack
# 1: 當leader接收到訊息之後傳送ack
# -1: 當所有的follower都同步訊息成功後傳送ack.
request.required.acks=0

# 在向producer傳送ack之前,broker允許等待的最大時間 ,如果超時,broker將會向producer傳送一個error ACK.意味著上一次訊息因為某種原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000

# 同步還是非同步傳送訊息,預設“sync”表同步,"async"表非同步。非同步可以提高發送吞吐量,
也意味著訊息將會在本地buffer中,並適時批量傳送,但是也可能導致丟失未傳送過去的訊息
producer.type=sync

# 在async模式下,當message被快取的時間超過此值後,將會批量傳送給broker,預設為5000ms
# 此值和batch.num.messages協同工作.
queue.buffering.max.ms = 5000

# 在async模式下,producer端允許buffer的最大訊息量
# 無論如何,producer都無法儘快的將訊息傳送給broker,從而導致訊息在producer端大量沉積
# 此時,如果訊息的條數達到閥值,將會導致producer端阻塞或者訊息被拋棄,預設為10000
queue.buffering.max.messages=20000

# 如果是非同步,指定每次批量傳送資料量,預設為200
batch.num.messages=500

# 當訊息在producer端沉積的條數達到"queue.buffering.max.meesages"後
# 阻塞一定時間後,佇列仍然沒有enqueue(producer仍然沒有傳送出任何訊息)
# 此時producer可以繼續阻塞或者將訊息拋棄,此timeout值用於控制"阻塞"的時間
# -1: 無阻塞超時限制,訊息不會被拋棄
# 0:立即清空佇列,訊息被拋棄
queue.enqueue.timeout.ms=-1


# 當producer接收到error ACK,或者沒有接收到ACK時,允許訊息重發的次數
# 因為broker並沒有完整的機制來避免訊息重複,所以當網路異常時(比如ACK丟失)
# 有可能導致broker接收到重複的訊息,預設值為3.
message.send.max.retries=3

# producer重新整理topic metada的時間間隔,producer需要知道partition leader的位置,以及當前topic的情況
# 因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立即重新整理
# (比如topic失效,partition丟失,leader失效等),此外也可以通過此引數來配置額外的重新整理機制,預設值600000
topic.metadata.refresh.interval.ms=60000

消費者配置檔案解釋

# zookeeper連線伺服器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

# zookeeper的session過期時間,預設5000ms,用於檢測消費者是否掛掉
zookeeper.session.timeout.ms=5000

#當消費者掛掉,其他消費者要等該指定時間才能檢查到並且觸發重新負載均衡
zookeeper.connection.timeout.ms=10000

# 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次獲得的訊息。一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的訊息
zookeeper.sync.time.ms=2000

#指定消費組
group.id=xxx

# 當consumer消費一定量的訊息之後,將會自動向zookeeper提交offset資訊
# 注意offset資訊並不是每消費一次訊息就向zk提交一次,而是現在本地儲存(記憶體),並定期提交,預設為true
auto.commit.enable=true

# 自動更新時間。預設60 * 1000
auto.commit.interval.ms=1000

# 當前consumer的標識,可以設定,也可以有系統生成,主要用來跟蹤訊息消費情況,便於觀察
conusmer.id=xxx

# 消費者客戶端編號,用於區分不同客戶端,預設客戶端程式自動產生
client.id=xxxx

# 最大取多少塊快取到消費者(預設10)
queued.max.message.chunks=50

# 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新 的consumer上,如果一個consumer獲得了某個partition的消費許可權,那麼它將會向zk註冊 "Partition Owner registry"節點資訊,但是有可能此時舊的consumer尚沒有釋放此節點, 此值用於控制,註冊節點的重試次數.
rebalance.max.retries=5

# 獲取訊息的最大尺寸,broker不會像consumer輸出大於此值的訊息chunk 每次feth將得到多條訊息,此值為總大小,提升此值,將會消耗更多的consumer端記憶體
fetch.min.bytes=6553600

# 當訊息的尺寸不足時,server阻塞的時間,如果超時,訊息將立即傳送給consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360

# 如果zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。有smallest、largest、anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。預設largest
auto.offset.reset=smallest

# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder