1. 程式人生 > >Apache Kafka程式設計入門指南:設定分割槽數和複製因子

Apache Kafka程式設計入門指南:設定分割槽數和複製因子

我們學習瞭如何編寫簡單的Kafka Producer程式。在那個例子中,在如果需要傳送的topic不存在,Producer將會建立它。我們都知道(假設你知道),每個topic都是有分割槽數和複製因子的,但是我們無法通過Producer相關的API設定分割槽數和複製因子的,因為Producer相關API建立topic的是通過讀取server.properties檔案中的num.partitionsdefault.replication.factor的。那麼是否就意味著咱們無法在程式裡面定義topic的分割槽數和複製因子呢?

們可以通過Kafka提供的AdminUtils.createTopic

函式來建立topic,它的函式原型如下:

def createTopic(zkClient: ZkClient, 
      topic: String,
      partitions: Int,   
      replicationFactor: Int,  
      topicConfig: Properties = new Properties)
這個函式是沒有返回值的。從上面的引數列表我們可以看出,partitions和replicationFactor引數就是上面說到的分割槽數和複製因子,所以我們可以通過這個引數來建立topic。在使用createTopic函式之前,我們需要建立zkClient物件,它裡面封裝了操作Zookeeper的相關API。而這個API不是Kafka內建的,所以我們需要先引入這個依賴:

<dependency>
      <groupId>com.101tec</groupId>
      <artifactId>zkclient</artifactId>
      <version>0.3</version>
</dependency>
然後我們就可以建立ZkClient物件了:
val zk = "node1:2181"
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val zkClient = new ZkClient(zk, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer)
需要特別主要的是,我們必須指定ZKStringSerializer物件,否則執行完程式碼之後,你可以看到zookeeper裡面已經建立了相關topic,而且你list的時候也可以看到你建立的topic,但是當你往這個topic裡面傳送訊息的是,你會得到以下的異常:
[2016-02-05 16:45:52,335] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test-topic(kafka.producer.async.DefaultEventHandler)
[2016-02-05 16:45:52,441] WARN Error while fetching metadata [{TopicMetadata for topic iteblog ->
No partition metadata for topic flight due to kafka.common.LeaderNotAvailableException}] for topic [flight]: class kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2016-02-05 16:45:52,441] ERROR Failed to send requests for topics flight with correlation ids in [41,48] (kafka.producer.async.DefaultEventHandler)
[2016-02-05 16:45:52,441] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:547)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

找到topic的Leader,所以無法傳送訊息。這是因為如果你不使用ZKStringSerializer物件,那麼只會在Zookeeper裡面建立topic的相關資訊,但是kafka並沒有建立這個主題!現在我們就可以使用AdminUtils.createTopic參見topic了:

val topic = "test-topic"
val replicationFactor = 1
val numPartitions = 2
AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor)
如果topic存在,那麼程式將會報錯:
Exception in thread "main" kafka.common.TopicExistsException: Topic "test-topic" already exists.
    at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:187)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
    at com.iteblog.kafka.IteblogProducerV3$.main(IteblogProducerV3.scala:46)
    at com.iteblog.kafka.IteblogProducerV3.main(IteblogProducerV3.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

否則什麼都沒輸出則代表你的topic建立成功了!完整程式碼片段如下:
package com.practice.kafka

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
 
object CreateTopic {

  def main(args: Array[String]) {
    val zk = "node1:2181"
    val sessionTimeoutMs = 10000
    val connectionTimeoutMs = 10000
    val zkClient = new ZkClient(zk, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer)
    val topic = "test-topic"
    val replicationFactor = 1
    val numPartitions = 2
    AdminUtils.createTopic(zkClient, topic, numPartitions, replicationFactor)
  }
}
除了使用上面AdminUtils.createTopic在建立主題的時候設定複製因子和分割槽數,我們還可以使用kafka.admin.TopicCommand來實現同樣的功能,如下:
val arguments = Array("--create", "--zookeeper", zk, "--replication-factor", "2", "--partition", "2", "--topic", "test-topic")
TopicCommand.main(arguments)