1. 程式人生 > >kafka2.9.2的分散式叢集安裝和demo(java api)測試

kafka2.9.2的分散式叢集安裝和demo(java api)測試

問題導讀
1、什麼是kafka?
2、kafka的官方網站在哪裡?
3、在哪裡下載?需要哪些元件的支援?
4、如何安裝?







  一、什麼是kafka?
  kafka是LinkedIn開發並開源的一個分散式MQ系統,現在是Apache的一個孵化專案。在它的主頁描述kafka為一個高吞吐量的分散式(能將訊息分散到不同的節點上)MQ。Kafka僅僅由7000行Scala編寫,據瞭解,Kafka每秒可以生產約25萬訊息(50 MB),每秒處理55萬訊息(110 MB)。
  kafka目前支援多種客戶端語言:java,python,c++,php等等。
  kafka叢集的簡要圖解如下,producer寫入訊息,consumer讀取訊息:

145455f6bg6fggektmo4ko.png (15.89 KB, 下載次數: 4)

下載附件  儲存到相簿

2014-8-26 02:28 上傳




  kafka設計目標
  • 高吞吐量是其核心設計之一。
  • 資料磁碟持久化:訊息不在記憶體中cache,直接寫入到磁碟,充分利用磁碟的順序讀寫效能。
  • zero-copy:減少IO操作步驟。
  • 支援資料批量傳送和拉取。
  • 支援資料壓縮。
  • Topic劃分為多個partition,提高並行處理能力。


  kafka名詞解釋和工作方式:
  • Producer :訊息生產者,就是向kafka broker發訊息的客戶端。
  • Consumer :訊息消費者,向kafka broker取訊息的客戶端
  • Topic :可以理解為一個佇列。
  • Consumer Group (CG):這是kafka用來實現一個topic訊息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的訊息會複製(不是真的複製,是概念上的)到所有的CG,但每個CG只會把訊息發給該CG中的一個consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次傳送訊息到不同的topic。
  • Broker :一臺kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic。
  • Partition:為了實現擴充套件性,一個非常大的topic可以分佈到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的佇列。partition中的每條訊息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將訊息發給consumer,不保證一個topic的整體(多個partition間)的順序。
  • Offset:kafka的儲存檔案都是按照offset.kafka來命名,用offset做名字的好處是方便查詢。例如你想找位於2049的位置,只要找到2048.kafka的檔案即可。當然the first offset就是00000000000.kafka


  kafak系統擴充套件性:
  • kafka使用zookeeper來實現動態的叢集擴充套件,不需要更改客戶端(producer和consumer)的配置。broker會在zookeeper註冊並保持相關的元資料(topic,partition資訊等)更新。
  • 而客戶端會在zookeeper上註冊相關的watcher。一旦zookeeper發生變化,客戶端能及時感知並作出相應調整。這樣就保證了新增或去除broker時,各broker間仍能自動實現負載均衡。


  kafak和zookeeper的關係:
  • Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連線併發送訊息.
  • Broker端使用zookeeper用來註冊broker資訊,已經監測partition leader存活性.
  • Consumer端使用zookeeper用來註冊consumer資訊,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連線,並獲取訊息.




 四、如何安裝?
  1、解壓kafka_2.9.2-0.8.1.1.tgz,本文中解壓到/home/hadoop目錄下

  1. [email protected]:/home/hadoop/kafka_2.9.2-0.8.1.1# pwd
  2. /home/hadoop/kafka_2.9.2-0.8.1.1
複製程式碼
2、修改server.properties配置檔案。這裡使用zookeeper的部分,見下方第123行:


[email protected]:/home/hadoop/kafka_2.9.2-0.8.1.1# cat config/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#

#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
#整數,建議根據ip區分,這裡我是使用zookeeper中的id來設定
broker.id=1

############################# Socket Server Settings #############################

# The port the socket server listens on
#broker用於接收producer訊息的埠
port=9092
#port=44444

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#broker的hostname
host.name=m1

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#這個是配置PRODUCER/CONSUMER連上來的時候使用的地址
advertised.host.name=m1

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>

# The number of threads handling network requests
num.network.threads=2
  
# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
#kafka存放訊息檔案的路徑
log.dirs=/home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
#topic的預設分割槽數
num.partitions=2

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
#kafka接收日誌的儲存目錄(目前我們儲存7天資料log.retention.hours=168)
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=60000

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=m1:2181,m2:2181,s1:2181,s2:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000

3、啟動zookeeper和kafka
    1)zookeeper的啟動
    啟動後可以用以下命令在每臺機器上檢視狀態:
  1. [email protected]:/home/hadoop# /home/hadoop/zookeeper-3.4.5/bin/zkServer.sh status
  2. JMX enabled by default
  3. Using config: /home/hadoop/zookeeper-3.4.5/bin/../conf/zoo.cfg
  4. Mode: leader
複製程式碼
    2)在m1,m2,s1,s2的機器上啟動kafka,在這之前請先將m1上的kafka複製到另外三臺機器上,複製後,記得更改server.properties配置檔案中的host名稱為當前所在機器。以下程式碼是在m1上執行後的效果:
  1. [email protected]:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /home/hadoop/kafka_2.9.2-0.8.1.1/config/server.properties &
  2. [1] 31823
  3. [email protected]:/home/hadoop# [2014-08-05 10:03:11,210] INFO Verifying properties (kafka.utils.VerifiableProperties)
  4. [2014-08-05 10:03:11,261] INFO Property advertised.host.name is overridden to m1 (kafka.utils.VerifiableProperties)
  5. [2014-08-05 10:03:11,261] INFO Property broker.id is overridden to 1 (kafka.utils.VerifiableProperties)
  6. [2014-08-05 10:03:11,264] INFO Property host.name is overridden to m1 (kafka.utils.VerifiableProperties)
  7. [2014-08-05 10:03:11,264] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
  8. [2014-08-05 10:03:11,264] INFO Property log.dirs is overridden to /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs (kafka.utils.VerifiableProperties)
  9. [2014-08-05 10:03:11,265] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)
  10. [2014-08-05 10:03:11,265] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
  11. [2014-08-05 10:03:11,265] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
  12. [2014-08-05 10:03:11,265] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
  13. [2014-08-05 10:03:11,266] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
  14. [2014-08-05 10:03:11,266] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
  15. [2014-08-05 10:03:11,267] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
  16. [2014-08-05 10:03:11,267] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
  17. [2014-08-05 10:03:11,268] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
  18. [2014-08-05 10:03:11,268] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
  19. [2014-08-05 10:03:11,268] INFO Property zookeeper.connect is overridden to m1:2181,m2:2181,s1:2181,s2:2181 (kafka.utils.VerifiableProperties)
  20. [2014-08-05 10:03:11,269] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
  21. [2014-08-05 10:03:11,302] INFO [Kafka Server 1], starting (kafka.server.KafkaServer)
  22. [2014-08-05 10:03:11,303] INFO [Kafka Server 1], Connecting to zookeeper on m1:2181,m2:2181,s1:2181,s2:2181 (kafka.server.KafkaServer)
  23. [2014-08-05 10:03:11,335] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
  24. [2014-08-05 10:03:11,348] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)
  25. [2014-08-05 10:03:11,348] INFO Client environment:host.name=m1 (org.apache.zookeeper.ZooKeeper)
  26. [2014-08-05 10:03:11,349] INFO Client environment:java.version=1.7.0_65 (org.apache.zookeeper.ZooKeeper)
  27. [2014-08-05 10:03:11,349] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
  28. [2014-08-05 10:03:11,349] INFO Client environment:java.home=/usr/lib/jvm/java-7-oracle/jre (org.apache.zookeeper.ZooKeeper)
  29. [2014-08-05 10:03:11,349] INFO Client environment:java.class.path=.:/usr/lib/jvm/java-7-oracle/lib/tools.jar:/usr/lib/jvm/java-7-oracle/lib/dt.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/scala-library-2.9.2.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/kafka_2.9.2-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)
  30. [2014-08-05 10:03:11,350] INFO Client environment:java.library.path=:/usr/local/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
  31. [2014-08-05 10:03:11,350] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
  32. [2014-08-05 10:03:11,350] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
  33. [2014-08-05 10:03:11,350] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
  34. [2014-08-05 10:03:11,350] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
  35. [2014-08-05 10:03:11,351] INFO Client environment:os.version=3.11.0-15-generic (org.apache.zookeeper.ZooKeeper)
  36. [2014-08-05 10:03:11,351] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
  37. [2014-08-05 10:03:11,351] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
  38. [2014-08-05 10:03:11,351] INFO Client environment:user.dir=/home/hadoop (org.apache.zookeeper.ZooKeeper)
  39. [2014-08-05 10:03:11,352] INFO Initiating client connection, connectString=m1:2181,m2:2181,s1:2181,s2:2181 sessionTimeout=6000 [email protected] (org.apache.zookeeper.ZooKeeper)
  40. [2014-08-05 10:03:11,380] INFO Opening socket connection to server m2/192.168.1.51:2181 (org.apache.zookeeper.ClientCnxn)
  41. [2014-08-05 10:03:11,386] INFO Socket connection established to m2/192.168.1.51:2181, initiating session (org.apache.zookeeper.ClientCnxn)
  42. [2014-08-05 10:03:11,398] INFO Session establishment complete on server m2/192.168.1.51:2181, sessionid = 0x247a3e09b460000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
  43. [2014-08-05 10:03:11,400] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
  44. [2014-08-05 10:03:11,652] INFO Loading log 'test-1' (kafka.log.LogManager)
  45. [2014-08-05 10:03:11,681] INFO Recovering unflushed segment 0 in log test-1. (kafka.log.Log)
  46. [2014-08-05 10:03:11,711] INFO Completed load of log test-1 with log end offset 137 (kafka.log.Log)
  47. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  48. SLF4J: Defaulting to no-operation (NOP) logger implementation
  49. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  50. [2014-08-05 10:03:11,747] INFO Loading log 'idoall.org-0' (kafka.log.LogManager)
  51. [2014-08-05 10:03:11,748] INFO Recovering unflushed segment 0 in log idoall.org-0. (kafka.log.Log)
  52. [2014-08-05 10:03:11,754] INFO Completed load of log idoall.org-0 with log end offset 5 (kafka.log.Log)
  53. [2014-08-05 10:03:11,760] INFO Loading log 'test-0' (kafka.log.LogManager)
  54. [2014-08-05 10:03:11,765] INFO Recovering unflushed segment 0 in log test-0. (kafka.log.Log)
  55. [2014-08-05 10:03:11,777] INFO Completed load of log test-0 with log end offset 151 (kafka.log.Log)
  56. [2014-08-05 10:03:11,779] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
  57. [2014-08-05 10:03:11,782] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
  58. [2014-08-05 10:03:11,800] INFO Awaiting socket connections on m1:9092. (kafka.network.Acceptor)
  59. [2014-08-05 10:03:11,802] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer)
  60. [2014-08-05 10:03:11,890] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
  61. [2014-08-05 10:03:11,919] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
  62. [2014-08-05 10:03:12,359] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
  63. [2014-08-05 10:03:12,387] INFO Registered broker 1 at path /brokers/ids/1 with address m1:9092. (kafka.utils.ZkUtils$)
  64. [2014-08-05 10:03:12,392] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
  65. [2014-08-05 10:03:12,671] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall.org,0],[test,0],[test,1] (kafka.server.ReplicaFetcherManager)
  66. [2014-08-05 10:03:12,741] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall.org,0],[test,0],[test,1] (kafka.server.ReplicaFetcherManager)
  67. [2014-08-05 10:03:25,327] INFO Partition [test,0] on broker 1: Expanding ISR for partition [test,0] from 1 to 1,2 (kafka.cluster.Partition)
  68. [2014-08-05 10:03:25,334] INFO Partition [test,1] on broker 1: Expanding ISR for partition [test,1] from 1 to 1,2 (kafka.cluster.Partition)
  69. [2014-08-05 10:03:26,905] INFO Partition [test,1] on broker 1: Expanding ISR for partition [test,1] from 1,2 to 1,2,3 (kafka.cluster.Partition)
複製程式碼
 4、測試kafka的狀態
    1)在m1上建立一個idoall_testTopic主題

  1. #KAFKA有幾個,replication-factor就填幾個
  2. [email protected]:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --create --topic idoall_testTopic --replication-factor 4 --partitions 2 --zookeeper m1:2181
  3. Created topic "idoall_testTopic".
  4. [2014-08-05 10:08:29,315] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall_testTopic,0] (kafka.server.ReplicaFetcherManager)
  5. [2014-08-05 10:08:29,334] INFO Completed load of log idoall_testTopic-0 with log end offset 0 (kafka.log.Log)
  6. [2014-08-05 10:08:29,373] INFO Created log for partition [idoall_testTopic,0] in /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
  7. [2014-08-05 10:08:29,384] WARN Partition [idoall_testTopic,0] on broker 1: No checkpointed highwatermark is found for partition [idoall_testTopic,0] (kafka.cluster.Partition)
  8. [2014-08-05 10:08:29,415] INFO Completed load of log idoall_testTopic-1 with log end offset 0 (kafka.log.Log)
  9. [2014-08-05 10:08:29,416] INFO Created log for partition [idoall_testTopic,1] in /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
  10. [2014-08-05 10:08:29,422] WARN Partition [idoall_testTopic,1] on broker 1: No checkpointed highwatermark is found for partition [idoall_testTopic,1] (kafka.cluster.Partition)
  11. [2014-08-05 10:08:29,430] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall_testTopic,1] (kafka.server.ReplicaFetcherManager)
  12. [2014-08-05 10:08:29,438] INFO Truncating log idoall_testTopic-1 to offset 0. (kafka.log.Log)
  13. [2014-08-05 10:08:29,473] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions ArrayBuffer([[idoall_testTopic,1], initOffset 0 to broker id:2,host:m2,port:9092] ) (kafka.server.ReplicaFetcherManager)
  14. [2014-08-05 10:08:29,475] INFO [ReplicaFetcherThread-0-2], Starting  (kafka.server.ReplicaFetcherThread)
複製程式碼
    2)在m1上檢視剛才建立的idoall_testTopic主題
  1. [email protected]:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --list --zookeeper m1:2181   
  2. idoall_testTopic
複製程式碼
    3)在m2上傳送訊息至kafka(m2模擬producer),傳送訊息“hello idoall.org”
  1. [email protected]:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-producer.sh --broker-list m1:9092 --sync --topic idoall_testTopic
  2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  3. SLF4J: Defaulting to no-operation (NOP) logger implementation
  4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  5. hello idoall.org
複製程式碼
    4)在s1上開啟一個消費者(s1模擬consumer),可以看到剛才傳送的訊息
  1. [email protected]:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic idoall_testTopic --from-beginning
  2. SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
  3. SLF4J: Defaulting to no-operation (NOP) logger implementation
  4. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  5. hello idoall.org
複製程式碼
    5)刪除掉一個Topic,這裡我們測試建立一個idoall的主題,再刪除掉
  1. [email protected]:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --create --topic idoall --replication-factor 4 --partitions 2 --zookeeper m1:2181         
  2. Created topic "idoall".
  3. [2014-08-05 10:38:30,862] INFO Completed load of log idoall-1 with log end offset 0 (kafka.log.Log)
  4. [2014-08-05 10:38:30,864] INFO Created log for partition [idoall,1] in /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)
  5. [2014-08-05 10:38:30,870] WARN Partition [idoall,1] on broker 1: No checkpointed highwatermark is found for partition [idoall,1] (kafka.cluster.Partition)
  6. [2014-08-05 10:38:30,878] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [idoall,1] (kafka.server.ReplicaFetcherManager)
  7. [2014-08-05 10:38:30,880] INFO Truncating log idoall-1 to offset 0. (kafka.log.Log)
  8. [2014-08-05 10:38:30,885] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions ArrayBuffer([[idoall,1], initOffset 0 to broker id:3,host:s1,port:9092] ) (kafka.server.ReplicaFetcherManager)
  9. [2014-08-05 10:38:30,887] INFO [ReplicaFetcherThread-0-3], Starting  (kafka.server.ReplicaFetcherThread)
  10. [email protected]:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --list --zookeeper m1:2181
  11. idoall
  12. idoall_testTopic
  13. [email protected]:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic idoall --zookeeper m2:2181   
  14. deletion succeeded!
  15. [email protected]:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --list --zookeeper m1:2181                  idoall_testTopic
  16. [email protected]:/home/hadoop#
複製程式碼
    同樣也可以進入到zookeeper中檢視主題是否已經刪除掉。
  1. [email protected]:/home/hadoop# /home/hadoop/zookeeper-3.4.5/bin/zkCli.sh
  2. Connecting to localhost:2181
  3. 2014-08-05 10:15:21,863 [myid:] - INFO  [main:[email protected]] - Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
  4. 2014-08-05 10:15:21,871 [myid:] - INFO  [main:[email protected]] - Client environment:host.name=m1
  5. 2014-08-05 10:15:21,871 [myid:] - INFO  [main:[email protected]] - Client environment:java.version=1.7.0_65
  6. 2014-08-05 10:15:21,872 [myid:] - INFO  [main:[email protected]] - Client environment:java.vendor=Oracle Corporation
  7. 2014-08-05 10:15:21,872 [myid:] - INFO  [main:[email protected]] - Client environment:java.home=/usr/lib/jvm/java-7-oracle/jre
  8. 2014-08-05 10:15:21,873 [myid:] - INFO  [main:[email protected]] - Client environment:java.class.path=/home/hadoop/zookeeper-3.4.5/bin/../build/classes:/home/hadoop/zookeeper-3.4.5/bin/../build/lib/*.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/slf4j-log4j12-1.6.1.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/slf4j-api-1.6.1.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/netty-3.2.2.Final.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/log4j-1.2.15.jar:/home/hadoop/zookeeper-3.4.5/bin/../lib/jline-0.9.94.jar:/home/hadoop/zookeeper-3.4.5/bin/../zookeeper-3.4.5.jar:/home/hadoop/zookeeper-3.4.5/bin/../src/java/lib/*.jar:/home/hadoop/zookeeper-3.4.5/bin/../conf:.:/usr/lib/jvm/java-7-oracle/lib/tools.jar:/usr/lib/jvm/java-7-oracle/lib/dt.jar
  9. 2014-08-05 10:15:21,874 [myid:] - INFO  [main:[email protected]] - Client environment:java.library.path=:/usr/local/lib:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
  10. 2014-08-05 10:15:21,874 [myid:] - INFO  [main:[email protected]] - Client environment:java.io.tmpdir=/tmp
  11. 2014-08-05 10:15:21,874 [myid:] - INFO  [main:[email protected]] - Client environment:java.compiler=<NA>
  12. 2014-08-05 10:15:21,875 [myid:] - INFO  [main:[email protected]] - Client environment:os.name=Linux
  13. 2014-08-05 10:15:21,875 [myid:] - INFO  [main:[email protected]] - Client environment:os.arch=amd64
  14. 2014-08-05 10:15:21,876 [myid:] - INFO  [main:[email protected]] - Client environment:os.version=3.11.0-15-generic
  15. 2014-08-05 10:15:21,876 [myid:] - INFO  [main:[email protected]] - Client environment:user.name=root
  16. 2014-08-05 10:15:21,877 [myid:] - INFO  [main:[email protected]] - Client environment:user.home=/root
  17. 2014-08-05 10:15:21,878 [myid:] - INFO  [main:[email protected]] - Client environment:user.dir=/home/hadoop
  18. 2014-08-05 10:15:21,879 [myid:] - INFO  [main:[email protected]] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 [email protected]
  19. Welcome to ZooKeeper!
  20. 2014-08-05 10:15:21,920 [myid:] - INFO  [main-SendThread(localhost:2181):[email protected]] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
  21. 2014-08-05 10:15:21,934 [myid:] - INFO  [main-SendThread(localhost:2181):[email protected]] - Socket connection established to localhost/127.0.0.1:2181, initiating session
  22. JLine support is enabled
  23. 2014-08-05 10:15:21,966 [myid:] - INFO  [main-SendThread(localhost:2181):[email protected]] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x147a3e1246b0007, negotiated timeout = 30000
  24. WATCHER::
  25. WatchedEvent state:SyncConnected type:None path:null
  26. [zk: localhost:2181(CONNECTED) 0] ls /
  27. [hbase, hadoop-ha, admin, zookeeper, consumers, config, controller, storm, brokers, controller_epoch]
  28. [zk: localhost:2181(CONNECTED) 1] ls /brokers
  29. [topics, ids]
  30. [zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
  31. [idoall_testTopic]
複製程式碼
 5、使用Eclipse來呼叫kafka的JAVA API來測試kafka的叢集狀態
    1)訊息生產端:Producertest.java

  1. package idoall.testkafka;
  2. import java.util.Date;
  3. import java.util.Properties;
  4. import java.text.SimpleDateFormat;   
  5. import kafka.javaapi.producer.Producer;
  6. import kafka.producer.KeyedMessage;
  7. import kafka.producer.ProducerConfig;
  8. /**
  9. * 訊息生產端
  10. * @author 迦壹
  11. * @Time 2014-08-05
  12. */
  13. public class Producertest {
  14.      public static void main(String[] args) {
  15.          Properties props = new Properties();
  16.          props.put("zk.connect", "m1:2181,m2:2181,s1:2181,s2:2181");
  17.          // serializer.class為訊息的序列化類
  18.          props.put("serializer.class", "kafka.serializer.StringEncoder");
  19.          // 配置metadata.broker.list, 為了高可用, 最好配兩個broker例項
  20.          props.put("metadata.broker.list", "m1:9092,m2:9092,s1:9092,s2:9092");
  21.          // 設定Partition類, 對佇列進行合理的劃分
  22.          //props.put("partitioner.class", "idoall.testkafka.Partitionertest");
  23.          // ACK機制, 訊息傳送需要kafka服務端確認
  24.          props.put("request.required.acks", "1");
  25.           props.put("num.partitions", "4");
  26.          ProducerConfig config = new ProducerConfig(props);
  27.          Producer<String, String> producer = new Producer<String, String>(config);
  28.          for (int i = 0; i < 10; i++)
  29.          {
  30.            // KeyedMessage<K, V>
  31.            //   K對應Partition Key的型別
  32.            //   V對應訊息本身的型別
  33. //   topic: "test", key: "key", message: "message"
  34.            SimpleDateFormat formatter = new SimpleDateFormat   ("yyyy年MM月dd日 HH:mm:ss SSS");      
  35.            Date curDate = new Date(System.currentTimeMillis());//獲取當前時間      
  36.            String str = formatter.format(curDate);   
  37.            String msg = "idoall.org" + i+"="+str;
  38.            String key = i+"";
  39.            producer.send(new KeyedMessage<String, String>("idoall_testTopic",key, msg));
  40.          }
  41.        }
  42. }
複製程式碼
    2)訊息消費端:Consumertest.java
  1. package idoall.testkafka;
  2. import java.util.HashMap;
  3. import java.util.List;  
  4. import java.util.Map;  
  5. import java.util.Properties;  
  6. import kafka.consumer.ConsumerConfig;  
  7. import kafka.consumer.ConsumerIterator;  
  8. import kafka.consumer.KafkaStream;  
  9. import kafka.javaapi.consumer.ConsumerConnector;
  10. /**
  11. * 訊息消費端
  12. * @author 迦壹
  13. * @Time 2014-08-05
  14. */
  15. public class Consumertest extends Thread{
  16.      private final ConsumerConnector consumer;  
  17.     private final String topic;  
  18.     public static void main(String[] args) {  
  19.       Consumertest consumerThread = new Consumertest("idoall_testTopic");  
  20.         consumerThread.start();  
  21.     }  
  22.     public Consumertest(String topic) {  
  23.         consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());  
  24.         this.topic =topic;  
  25.     }  
  26. private static ConsumerConfig createConsumerConfig() {  
  27.     Properties props = new Properties();  
  28.     // 設定zookeeper的連結地址
  29.     props.put("zookeeper.connect","m1:2181,m2:2181,s1:2181,s2:2181");  
  30.     // 設定group id
  31.     props.put("group.id", "1");  
  32.     // kafka的group 消費記錄是儲存在zookeeper上的, 但這個資訊在zookeeper上不是實時更新的, 需要有個間隔時間更新
  33.     props.put("auto.commit.interval.ms", "1000");
  34.     props.put("zookeeper.session.timeout.ms","10000");  
  35.     return new ConsumerConfig(props);  
  36. }  
  37. public void run(){  
  38.      //設定Topic=>Thread Num對映關係, 構建具體的流
  39.     Map<String,Integer> topickMap = new HashMap<String, Integer>();  
  40.     topickMap.put(topic, 1);  
  41.     Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  
  42.     KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
  43.     ConsumerIterator<byte[],byte[]> it =stream.iterator();  
  44.     System.out.println("*********Results********");  
  45.     while(it.hasNext()){  
  46.         System.err.println("get data:" +new String(it.next().message()));  
  47.         try {  
  48.             Thread.sleep(1000);  
  49.         } catch (InterruptedException e) {  
  50.             e.printStackTrace();  
  51.         }  
  52.     }  
  53. }  
  54. }
複製程式碼


    3)在Eclipse檢視java程式碼效果,在這之前先在其中一臺機器(我使用的s1),開啟消費者,同時觀察eclipse和s1上的消費者是否都收到了訊息。最後結果如下圖:

2.png (109.75 KB, 下載次數: 3)

下載附件  儲存到相簿

2014-8-26 02:27 上傳



----------兩張圖片之間的分隔線----------

3.png (177.59 KB, 下載次數: 2)

下載附件  儲存到相簿

2014-8-26 02:27 上傳




    可以看到,剛好10條資訊,沒有丟失。不過訊息因為均衡的原因,並非是有序的,在Kafka只提供了分割槽內部的有序性,不能跨partition. 每個分割槽的有序性,結合按Key分partition的能力對大多應用都夠用了。(如何按key進行分partition,在文章末尾提供的Eclpise程式碼中有個Partitionertest.java提供了一個Demo)

  6、在命令列下打包java檔案,測試kafka
    1)修改工程目錄中的pom.xml檔案

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3.   <modelVersion>4.0.0</modelVersion>
  4.   <groupId>idoall.testkafka</groupId>
  5.   <artifactId>idoall.testkafka</artifactId>
  6.   <version>0.0.1-SNAPSHOT</version>
  7.   <packaging>jar</packaging>
  8.   <name>idoall.testkafka</name>
  9.   <url>http://maven.apache.org</url>
  10.   <properties>
  11.     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12.   </properties>
  13.   <dependencies>
  14.     <dependency>
  15.       <groupId>junit</groupId>
  16.       <artifactId>junit</artifactId>
  17.       <version>3.8.1</version>
  18.       <scope>test</scope>
  19.     </dependency>
  20.     <dependency>
  21.       <groupId>log4j</groupId>
  22.       <artifactId>log4j</artifactId>
  23.       <version>1.2.14</version>
  24.     </dependency>
  25.     <dependency>
  26.       <groupId>com.sksamuel.kafka</groupId>
  27.       <artifactId>kafka_2.10</artifactId>
  28.       <version>0.8.0-beta1</version>
  29.     </dependency>
  30.   </dependencies>
  31.   <build>  
  32.         <finalName>idoall.testkafka</finalName>  
  33.         <plugins>  
  34.             <plugin>  
  35.                 <groupId>org.apache.maven.plugins</groupId>  
  36.                 <artifactId>maven-compiler-plugin</artifactId>  
  37.                 <version>2.0.2</version>  
  38.                 <configuration>  
  39.                     <source>1.5</source>  
  40.                     <target>1.5</target>  
  41.                     <encoding>UTF-8</encoding>  
  42.                 </configuration>  
  43.             </plugin>  
  44.             <plugin>  
  45.                 <artifactId>maven-assembly-plugin</artifactId>  
  46.                 <version>2.4</version>  
  47.                 <configuration>  
  48.                     <descriptors>  
  49.                         <descriptor>src/main/src.xml</descriptor>  
  50.                     </descriptors>  
  51.                     <descriptorRefs>  
  52.                         <descriptorRef>jar-with-dependencies</descriptorRef>  
  53.                     </descriptorRefs>  
  54.                 </configuration>  
  55.                 <executions>  
  56.                     <execution>  
  57.                         <id>make-assembly</id> <!-- this is used for inheritance merges -->
  58.                         <phase>package</phase> <!-- bind to the packaging phase -->
  59.                         <goals>  
  60.                             <goal>single</goal>  
  61.                         </goals>  
  62.                     </execution>  
  63.                 </executions>  
  64.             </plugin>  
  65.         </plugins>  
  66.     </build>
  67. </project>
複製程式碼
    2)修改工程目錄中的src/main/src.xml檔案

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
  3.           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.           xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd ">
  5.     <id>jar-with-dependencies</id>
  6.     <formats>
  7.         <format>jar</format>
  8.     </formats>
  9.     <includeBaseDirectory>false</includeBaseDirectory>
  10.     <dependencySets>
  11.         <dependencySet>
  12.             <unpack>false</unpack>
  13.             <scope>runtime</scope>
  14.         </dependencySet>
  15.     </dependencySets>
  16.     <fileSets>
  17.         <fileSet>
  18.             <directory>/lib</directory>
  19.         </fileSet>
  20.     </fileSets>
  21. </assembly>
複製程式碼
    3)製作依賴包,在工程目錄執行mvn package,得到idoall.testkafka-jar-with-dependencies.jar,下面是部分執行後的結果:
  1. Running idoall.testkafka.idoall.testkafka.AppTest
  2. Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.004 sec
  3. Results :
  4. Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
  5. [INFO]
  6. [INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ idoall.testkafka ---
  7. [INFO] Building jar: /Users/lion/Documents/_my_project/java/idoall.testkafka/target/idoall.testkafka.jar
  8. [INFO]
  9. [INFO] --- maven-assembly-plugin:2.4:single (make-assembly) @ idoall.testkafka ---
  10. [INFO] Reading assembly descriptor: src/main/src.xml
  11. [WARNING] The assembly id jar-with-dependencies is used more than once.
  12. [INFO] Building jar: /Users/lion/Documents/_my_project/java/idoall.testkafka/target/idoall.testkafka-jar-with-dependencies.jar
  13. [INFO] Building jar: /Users/lion/Documents/_my_project/java/idoall.testkafka/target/idoall.testkafka-jar-with-dependencies.jar
  14. [INFO] ------------------------------------------------------------------------
  15. [INFO] BUILD SUCCESS
  16. [INFO] ------------------------------------------------------------------------
  17. [INFO] Total time: 9.074 s
  18. [INFO] Finished at: 2014-08-05T12:22:47+08:00
  19. [INFO] Final Memory: 63M/836M
  20. [INFO] ------------------------------------------------------------------------
複製程式碼
    4)編譯檔案,進入到工程目錄,執行命令
  1. liondeMacBook-Pro:idoall.testkafka lion$ pwd
  2. /Users/lion/Documents/_my_project/java/idoall.testkafka
  3. liondeMacBook-Pro:idoall.testkafka lion$ javac -classpath target/idoall.testkafka-jar-with-dependencies.jar -d . src/main/java/idoall/testkafka/*.java
複製程式碼


    5)執行編譯後的檔案。分別開啟兩個視窗,一個用來消費,一個用來生產。可以看到消費視窗可以正常顯示訊息。
  1. java -classpath .:target/idoall.testkafka-jar-with-dependencies.jar idoall.testkafka.Producertest
  2. java -classpath .:target/idoall.testkafka-jar-with-dependencies.jar idoall.testkafka.Consumertest
複製程式碼

4.png (178.55 KB, 下載次數: 2)

下載附件  儲存到相簿

2014-8-26 02:27 上傳



----------兩張圖片之間的分隔線----------

5.png (253.37 KB, 下載次數: 2)

下載附件  儲存到相簿

2014-8-26 02:28 上傳




  五、FAQ
  1、如果在建立主題時出現下面的錯誤 ,那就是啟動的brokers的個數達不到你所指定的--replication-factor值:


  1. Error while executing topic command replication factor: 3 larger than available brokers: 1
  2. kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1
  3.         at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
  4.         at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:155)
  5.         at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:86)
  6.         at kafka.admin.TopicCommand$.main(TopicCommand.scala:50)
  7.         at kafka.admin.TopicCommand.main(TopicCommand.scala)
複製程式碼
 2、如果出現下面的錯誤,可以先啟動kafka,再啟動hadoop中的zkfc(DFSZKFailoverController):
  1. Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0130000, 986513408, 0) failed; error='Cannot allocate memory' (errno=12)
  2. #
  3. # There is insufficient memory for the Java Runtime Environment to continue.
  4. # Native memory allocation (malloc) failed to allocate 986513408 bytes for committing reserved memory.
  5. # An error report file with more information is saved as:
  6. # /home/hadoop/hs_err_pid13558.log
複製程式碼

程式碼下載:
連結: http://pan.baidu.com/s/1bnriYK3 密碼: x20l