1. 程式人生 > >【Kafka】kafka環境搭建及使用

【Kafka】kafka環境搭建及使用

Kafka是一個分散式的、可分割槽的、可複製的訊息系統。它提供了普通訊息系統的功能,但具有自己獨特的設計

  • Kafka將訊息以topic為單位進行歸納。
  • 將向Kafka topic釋出訊息的程式成為producers.
  • 將預訂topics並消費訊息的程式成為consumer.
  • Kafka以叢集的方式執行,可以由一個或多個服務組成,每個服務叫做一個broker.

下面來看下如何簡單的使用:

下載完了自後,tar -zxvf *.tar.gz 解壓,目錄如下:


先來看下bin目錄下:


從這些指令碼可以看出kafka本身是結合了zookeeper來使用的

Zookeeper 協調控制
1. 管理broker與consumer的動態加入與離開。
2. 觸發負載均衡,當broker或consumer加入或離開時會觸發負載均衡演算法,使得一
個consumer group內的多個consumer的訂閱負載平衡。
3. 維護消費關係及每個partion的消費資訊。

Zookeeper上的細節:
1. 每個broker啟動後會在zookeeper上註冊一個臨時的broker registry,包含broker的ip地址和埠號,所儲存的topics和partitions資訊。
2. 每個consumer啟動後會在zookeeper上註冊一個臨時的consumer registry:包含consumer所屬的consumer group以及訂閱的topics。
3. 每個consumer group關聯一個臨時的owner registry和一個持久的offset registry。對於被訂閱的每個partition包含一個owner registry,內容為訂閱這個partition的consumer id;同時包含一個offset registry,內容為上一次訂閱的offset。

對於入門,我們就暫且使用kafka安裝包中自帶的zookeeper程式,這個程式在libs庫中


可以看到這裡有zkclient和zookeeper的依賴包,所以當我們使用該kafka程式的時候,得先啟動zookeeper

再來看下config下面是什麼東東:


這裡就是一些具體的配置資訊了

我們來看一個producer.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.producer.ProducerConfig for more details

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
metadata.broker.list=localhost:9092

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync

# specify the compression codec for all data generated: none , gzip, snappy.
# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally
compression.codec=none

# message encoder
serializer.class=kafka.serializer.DefaultEncoder

# allow topic level compression
#compressed.topics=

############################# Async Producer #############################
# maximum time, in milliseconds, for buffering data on the producer queue 
#queue.buffering.max.ms=

# the maximum size of the blocking queue for buffering on the producer 
#queue.buffering.max.messages=

# Timeout for event enqueue:
# 0: events will be enqueued immediately or dropped if the queue is full
# -ve: enqueue will block indefinitely if the queue is full
# +ve: enqueue will block up to this many milliseconds if the queue is full
#queue.enqueue.timeout.ms=

# the number of messages batched at the producer 
#batch.num.messages=
這裡配置了服務地址,訊息傳送的型別,同步還是非同步,是否壓縮,訊息編碼等資訊

下面我們就來用一下,展示一下:

1、啟動zookeeper


2、啟動kafka


3、都啟動成功了,我們就生產消費吧

建立topic
[[email protected] bin]# sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic CMCC
Created topic "CMCC".
生產資訊
[[email protected] bin]# sh kafka-console-producer.sh --broker-list localhost:9092 --topic CMCC
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hwl^H^H
Hwll
Hello Kafka !
消費資訊
[[email protected] bin]# sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic CMCC --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hwl
Hwll
Hello Kafka !

至此,一個簡單的DEMO就演示結束了,下面看看一個簡單的叢集怎麼來玩??

首先,我們要再配置兩個服務

第一個

broker.id=1
port=9093
log.dirs=/tmp/kafka-logs-1
第二個
broker.id=2
port=9094
log.dirs=/tmp/kafka-logs-2
可以看到這兩個配置和初始的server.properties結構一模一樣,只是檔案中的屬性值變了而已,kafka中通過brokder.id來唯一標識叢集中的一個服務,所以我們基本是修改了配置中的broker.id   port   log.dirs三個屬性

下面將這個兩個服務也啟動起來

sh bin/kafka-server-start.sh config/server-1.properties &
sh bin/kafka-server-start.sh config/server-2.properties &

下面建立topic

sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic chiwei

看下topic描述

[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic chiwei
Topic:chiwei    PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: chiwei   Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
[[email protected] kafka_2.10-0.8.1.1]# 
官方描述
  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
還記得我們第一次建立的topic,看看他的描述
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic CMCC
Topic:CMCC      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: CMCC     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

下面我們開始生產訊息

[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic chiwei
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hello Chiwei !

消費資訊

[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic chiwei
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hello Chiwei !
成功了!

下面我們來測試一下這個叢集的容錯能力怎麼樣?

剛才我們看到主題描述的資訊顯示brokder.id=2是leader,那麼我現在把leader程序給殺了,來看看什麼情況啊?

ps -ef | grep server-2
kill -9 9663
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic chiwei
Topic:chiwei    PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: chiwei   Partition: 0    Leader: 1       Replicas: 2,1,0 Isr: 1,0
[[email protected] kafka_2.10-0.8.1.1]# 

leader已經被換了,同時我們注意到replicas已然顯示有2,而isr沒有2了,因為isr顯示的是“in-sync”的服務id,而2已經不再同步服務了;而replicas為什麼還顯示2呢

  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
這句話就說明原因了“even if they are currently alive”

這時候我們再去消費剛才主題的訊息看看

[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic chiwei
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hello Chiwei !
已然存在!說明訊息同步到了叢集的每個服務節點上。

附錄:一些常用操作

1、檢視所有的topic

./kafka-topics.sh --list --zookeeper ip:port

2、建立主題topic

sh kafka-topics.sh --create -zookeeper 192.168.11.176:2181 --replication-factor 2 --partitions 4 --topic cmcc

這裡的replication-factor的個數不能大於broker的數量

3、檢視具體主題明細

sh kafka-topics.sh --describe --zookeeper 192.168.11.176:2181 --topic cmcc

4、檢視具體主題的不可用分割槽

sh kafka-topics.sh --describe --unavailable-partitions --zookeeper 192.168.11.176:2181 --topic chiwei

5、刪除主題

sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic chiwei --zookeeper 192.168.11.176:2181

Command must include exactly one action: --list, --describe, --create or --alter
Option                                  Description                            
------                                  -----------                            
--alter                                 Alter the configuration for the topic. 
--config <name=value>                   A topic configuration override for the 
                                          topic being created or altered.      
--create                                Create a new topic.                    
--deleteConfig <name>                   A topic configuration override to be   
                                          removed for an existing topic        
--describe                              List details for the given topics.     
--help                                  Print usage information.               
--list                                  List all available topics.             
--partitions <Integer: # of partitions> The number of partitions for the topic 
                                          being created or altered (WARNING:   
                                          If partitions are increased for a    
                                          topic that has a key, the partition  
                                          logic or ordering of the messages    
                                          will be affected                     
--replica-assignment                    A list of manual partition-to-broker   
  <broker_id_for_part1_replica1 :         assignments for the topic being      
  broker_id_for_part1_replica2 ,          created or altered.                  
  broker_id_for_part2_replica1 :                                               
  broker_id_for_part2_replica2 , ...>                                          
--replication-factor <Integer:          The replication factor for each        
  replication factor>                     partition in the topic being created.
--topic <topic>                         The topic to be create, alter or       
                                          describe. Can also accept a regular  
                                          expression except for --create option
--topics-with-overrides                 if set when describing topics, only    
                                          show topics that have overridden     
                                          configs                              
--unavailable-partitions                if set when describing topics, only    
                                          show partitions whose leader is not  
                                          available                            
--under-replicated-partitions           if set when describing topics, only    
                                          show under replicated partitions     
--zookeeper <urls>                      REQUIRED: The connection string for    
                                          the zookeeper connection in the form 
                                          host:port. Multiple URLS can be      
                                          given to allow fail-over.