【Kafka】kafka環境搭建及使用
Kafka是一個分散式的、可分割槽的、可複製的訊息系統。它提供了普通訊息系統的功能,但具有自己獨特的設計
- Kafka將訊息以topic為單位進行歸納。
- 將向Kafka topic釋出訊息的程式成為producers.
- 將預訂topics並消費訊息的程式成為consumer.
- Kafka以叢集的方式執行,可以由一個或多個服務組成,每個服務叫做一個broker.
下面來看下如何簡單的使用:
下載完了自後,tar -zxvf *.tar.gz 解壓,目錄如下:
先來看下bin目錄下:
從這些指令碼可以看出kafka本身是結合了zookeeper來使用的
Zookeeper 協調控制
1. 管理broker與consumer的動態加入與離開。
2. 觸發負載均衡,當broker或consumer加入或離開時會觸發負載均衡演算法,使得一
個consumer group內的多個consumer的訂閱負載平衡。
3. 維護消費關係及每個partion的消費資訊。
Zookeeper上的細節:
1. 每個broker啟動後會在zookeeper上註冊一個臨時的broker registry,包含broker的ip地址和埠號,所儲存的topics和partitions資訊。
2. 每個consumer啟動後會在zookeeper上註冊一個臨時的consumer registry:包含consumer所屬的consumer group以及訂閱的topics。
3. 每個consumer group關聯一個臨時的owner registry和一個持久的offset registry。對於被訂閱的每個partition包含一個owner registry,內容為訂閱這個partition的consumer id;同時包含一個offset registry,內容為上一次訂閱的offset。
對於入門,我們就暫且使用kafka安裝包中自帶的zookeeper程式,這個程式在libs庫中
可以看到這裡有zkclient和zookeeper的依賴包,所以當我們使用該kafka程式的時候,得先啟動zookeeper
再來看下config下面是什麼東東:
這裡就是一些具體的配置資訊了
我們來看一個producer.properties
這裡配置了服務地址,訊息傳送的型別,同步還是非同步,是否壓縮,訊息編碼等資訊# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.producer.ProducerConfig for more details ############################# Producer Basics ############################# # list of brokers used for bootstrapping knowledge about the rest of the cluster # format: host1:port1,host2:port2 ... metadata.broker.list=localhost:9092 # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync # specify the compression codec for all data generated: none , gzip, snappy. # the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally compression.codec=none # message encoder serializer.class=kafka.serializer.DefaultEncoder # allow topic level compression #compressed.topics= ############################# Async Producer ############################# # maximum time, in milliseconds, for buffering data on the producer queue #queue.buffering.max.ms= # the maximum size of the blocking queue for buffering on the producer #queue.buffering.max.messages= # Timeout for event enqueue: # 0: events will be enqueued immediately or dropped if the queue is full # -ve: enqueue will block indefinitely if the queue is full # +ve: enqueue will block up to this many milliseconds if the queue is full #queue.enqueue.timeout.ms= # the number of messages batched at the producer #batch.num.messages=
下面我們就來用一下,展示一下:
1、啟動zookeeper
2、啟動kafka
3、都啟動成功了,我們就生產消費吧
建立topic
[[email protected] bin]# sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic CMCC
Created topic "CMCC".
生產資訊
[[email protected] bin]# sh kafka-console-producer.sh --broker-list localhost:9092 --topic CMCC SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Hwl^H^H Hwll Hello Kafka !
消費資訊
[[email protected] bin]# sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic CMCC --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hwl
Hwll
Hello Kafka !
至此,一個簡單的DEMO就演示結束了,下面看看一個簡單的叢集怎麼來玩??
首先,我們要再配置兩個服務
第一個
broker.id=1
port=9093
log.dirs=/tmp/kafka-logs-1
第二個
broker.id=2
port=9094
log.dirs=/tmp/kafka-logs-2
可以看到這兩個配置和初始的server.properties結構一模一樣,只是檔案中的屬性值變了而已,kafka中通過brokder.id來唯一標識叢集中的一個服務,所以我們基本是修改了配置中的broker.id port log.dirs三個屬性
下面將這個兩個服務也啟動起來
sh bin/kafka-server-start.sh config/server-1.properties &
sh bin/kafka-server-start.sh config/server-2.properties &
下面建立topic
sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic chiwei
看下topic描述
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic chiwei
Topic:chiwei PartitionCount:1 ReplicationFactor:3 Configs:
Topic: chiwei Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
[[email protected] kafka_2.10-0.8.1.1]#
官方描述
- "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
- "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
- "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic CMCC
Topic:CMCC PartitionCount:1 ReplicationFactor:1 Configs:
Topic: CMCC Partition: 0 Leader: 0 Replicas: 0 Isr: 0
下面我們開始生產訊息
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic chiwei
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hello Chiwei !
消費資訊
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic chiwei
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hello Chiwei !
成功了!
下面我們來測試一下這個叢集的容錯能力怎麼樣?
剛才我們看到主題描述的資訊顯示brokder.id=2是leader,那麼我現在把leader程序給殺了,來看看什麼情況啊?
ps -ef | grep server-2
kill -9 9663
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic chiwei
Topic:chiwei PartitionCount:1 ReplicationFactor:3 Configs:
Topic: chiwei Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0
[[email protected] kafka_2.10-0.8.1.1]#
leader已經被換了,同時我們注意到replicas已然顯示有2,而isr沒有2了,因為isr顯示的是“in-sync”的服務id,而2已經不再同步服務了;而replicas為什麼還顯示2呢
- "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
這時候我們再去消費剛才主題的訊息看看
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic chiwei
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Hello Chiwei !
已然存在!說明訊息同步到了叢集的每個服務節點上。
附錄:一些常用操作
1、檢視所有的topic
./kafka-topics.sh
--list --zookeeper ip:port
2、建立主題topic
sh kafka-topics.sh --create -zookeeper 192.168.11.176:2181 --replication-factor
2 --partitions 4 --topic cmcc
這裡的replication-factor的個數不能大於broker的數量
3、檢視具體主題明細
sh kafka-topics.sh --describe --zookeeper 192.168.11.176:2181
--topic cmcc
4、檢視具體主題的不可用分割槽
sh kafka-topics.sh --describe --unavailable-partitions --zookeeper
192.168.11.176:2181 --topic chiwei
5、刪除主題
sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic
chiwei --zookeeper 192.168.11.176:2181
Command must include exactly one action: --list, --describe, --create or --alter
Option Description
------ -----------
--alter Alter the configuration for the topic.
--config <name=value> A topic configuration override for the
topic being created or altered.
--create Create a new topic.
--deleteConfig <name> A topic configuration override to be
removed for an existing topic
--describe List details for the given topics.
--help Print usage information.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected
--replica-assignment A list of manual partition-to-broker
<broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being created.
--topic <topic> The topic to be create, alter or
describe. Can also accept a regular
expression except for --create option
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.