1. 程式人生 > >kafka簡述與叢集配置

kafka簡述與叢集配置

一、kafka簡述

 1、簡介

kafka是一個高吞吐的分散式訊息佇列系統。特點是生產者消費者模式,先進先出(FIFO)保證順序,自己不丟資料,預設每隔7天清理資料。訊息列隊常見場景:系統之間解耦合、峰值壓力緩衝、非同步通訊。

2、叢集介紹

(1)Kafka架構是由producer(訊息生產者)、consumer(訊息消費者)、borker(kafka叢集的server,負責處理訊息讀、寫請求,儲存訊息,在kafka cluster這一層這裡,其實裡面是有很多個broker)、topic(訊息佇列/分類相當於佇列,裡面有生產者和消費者模型)、zookeeper(元資料資訊存在zookeeper中,包括:儲存消費偏移量,topic話題資訊,partition資訊) 這些部分組成。

(2)kafka裡面的訊息是有topic來組織的,簡單的我們可以想象為一個佇列,一個佇列就是一個topic,然後它把每個topic又分為很多個partition,這個是為了做並行的,在每個partition內部訊息強有序,相當於有序的佇列,其中每個訊息都有個序號offset,比如0到12,從前面讀往後面寫。一個partition對應一個broker,一個broker可以管多個partition,比如說,topic有6個partition,有兩個broker,那每個broker就管3個partition。這個partition可以很簡單想象為一個檔案,當資料發過來的時候它就往這個partition上面append,追加就行,訊息不經過記憶體緩衝,直接寫入檔案,kafka和很多訊息系統不一樣,很多訊息系統是消費完了我就把它刪掉,而kafka是根據時間策略刪除,而不是消費完就刪除,在kafka裡面沒有一個消費完這麼個概念,只有過期這樣一個概念。

(3)producer自己決定往哪個partition裡面去寫,這裡有一些的策略,譬如如果hash,不用多個partition之間去join資料了。consumer自己維護消費到哪個offset,每個consumer都有對應的group,group內是queue消費模型(各個consumer消費不同的partition,因此一個訊息在group內只消費一次),group間是publish-subscribe消費模型,各個group各自獨立消費,互不影響,因此一個訊息在被每個group消費一次。

3、leader負載均衡機制

當一個broker停止或者crashes時,所有本來將它作為leader的分割槽將會把leader轉移到其他broker上去,極端情況下,會導致同一個leader管理多個分割槽,導致負載不均衡,同時當這個broker重啟時,如果這個broker不再是任何分割槽的leader,kafka的client也不會從這個broker來讀取訊息,從而導致資源的浪費。

kafka中有一個被稱為優先副本(preferred replicas)的概念。如果一個分割槽有3個副本,且這3個副本的優先級別分別為0,1,2,根據優先副本的概念,0會作為leader 。當0節點的broker掛掉時,會啟動1這個節點broker當做leader。當0節點的broker再次啟動後,會自動恢復為此partition的leader。不會導致負載不均衡和資源浪費,這就是leader的均衡機制。

在配置檔案conf/ server.properties中配置開啟(預設就是開啟):

auto.leader.rebalance.enable true

二、叢集配置

1、zookeeper安裝與配置

(1)下載並解壓

   在node01 /opt/bigdata/下 解壓 tar -zxvf zookeeper-3.4.6.tar.gz

(2)編輯配置

  • 配置hosts vim /etc/hosts
192.168.172.73 node03
192.168.172.72 node02
192.168.172.71 node01
  • 配置zookeeper環境變數
export ZOOKEEPER_HOME=/opt/bigdata/zookeeper-3.4.6 #zookeeper安裝路徑  
export PATH=$ZOOKEEPER_HOME/bin:$PATH
  • 配置zoo.cfg

在/opt/bigdata下,複製cp zookeeper-3.4.5/conf/zoo_sample.cfg  zookeeper-3.4.5/conf/zoo.cfg 

編輯:vim zookeeper-3.4.5/conf/zoo.cfg 

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/opt/bigdata/data/zookeeper/zkdata #zookeeper資料存放路徑
dataLogDir=/opt/bigdata/data/zookeeper/zkdatalog #zookeeper日誌存放路徑
# the port at which the clients will connect
clientPort=2181        ##zookeeper對外通訊埠

server.1=node01:2888:3888  
server.2=node02:2888:3888  
server.3=node03:2888:3888 
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

分別在node01、node02、node03下/opt/bigdata/data/zookeeper/zkdata

vim myid 新建myid檔案,內容分別為1、2、3儲存

(3)節點分發

在node01 /opt/bigdata下 scp遠端複製,分別分發到node02、node03對應目錄下

scp -r zookeeper-3.4.6 node02:`pwd`
scp -r zookeeper-3.4.6 node03:`pwd`

(4)啟動zookeeper叢集

分別在node01、node02、node03下執行 zkServer.sh start命令啟動zookeeper

稍等片刻,分別在node01、node02、node03下執行zkServer.sh status命令,檢視狀態

[[email protected] ~]# zkServer.sh status
JMX enabled by default
Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
Mode: leader
[[email protected] bigdata]# zkServer.sh status
JMX enabled by default
Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
Mode: follower
[[email protected] ~]# zkServer.sh status
JMX enabled by default
Using config: /opt/bigdata/zookeeper-3.4.5/bin/../conf/zoo.cfg
Mode: follower


3、kafka安裝與配置

(1)下載並解壓

在node01上  /opt/bigdata/下 解壓

tar zxvf kafka_2.11-1.1.0.tgz  

(2)編輯配置

在/opt/bigdata/下 vim kafka_2.11-1.1.0/config/server.properties編輯配置

這裡重點修改三個引數broker.id標識本機、log.dirs是kafka接收訊息存放路徑、

zookeeper.connect指定連線的zookeeper叢集地址

其他引數保持預設即可,也可自己根據情況修改

############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
############################# Socket Server Settings #############################
listeners=PLAINTEXT://:9092

# The port the socket server listens on
#port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
#advertised.host.name=<hostname routable by clients>

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/opt/bigdata/kafka_2.11-1.1.0/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies
log.retention.check.interval.ms=300000

# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

############################# Zookeeper #############################
zookeeper.connect=node01:2181,node02:2181,node03:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

(3)節點分發

在 /opt/bigdata下 遠端複製到node01、node02對應路徑下,當然複製時需要ssh免登入

scp -r  kafka_2.11-1.1.0 node02:`pwd`
scp -r  kafka_2.11-1.1.0 node03:`pwd`

分別修改server.properties對應的broker.id為2、3即可

(4)啟動kafka叢集

kafka叢集啟動前要啟動zookeeper叢集,若zookeeper叢集沒啟動,首先啟動

在/opt/bigdata下 ,三個節點分別執行如下命令,啟動kafka叢集

./kafka_2.11-1.1.0/bin/kafka-server-start.sh -daemon ./kafka_2.11-1.1.0/config/server.properties &

(5)基本操作

1)、建立topic

./kafka_2.11/bin/kafka-topics.sh --create --zookeeper node02:2181,node03:2181,node04:2181 --replication-factor 2 --partitions 2 --topic kfk_test

2)、列出建立的topic

./kafka_2.11/bin/kafka-topics.sh --list --zookeeper node02:2181,node03:2181,node04:2181

3)、生成資料

./kafka_2.11/bin/kafka-console-producer.sh -broker-list node02:9092,node03:9092,node04:9092 --topic kfk_test

4)、消費生產資料

kafka 0.9版本之前用zookeeper 
./kafka_2.11/bin/kafka-console-consumer.sh --zookeeper node02:2181,node03:2181,node04:2181 --from-beginning --topic kfk_test

kafka 0.9版本之後不推薦zookeeper方式,仍然支援,但逐漸會被取消,推薦bootstrap-server方式
./kafka_2.11/bin/kafka-console-consumer.sh --bootstrap-server node02:9092,node03:9092,node04:9092 --from-beginning --topic kfk_test

5)、檢視指定topic資訊

./kafka_2.11/bin/kafka-topics.sh --describe --zookeeper node02:2181,node03:2181,node04:2181 --topic kfk_test

資訊如下:

Topic:kfk_test  PartitionCount:2	ReplicationFactor:2	Configs:
Topic: kfk_test	 Partition: 0	Leader: 2	Replicas: 2,4	Isr: 2,4
Topic: kfk_test	 Partition: 1	Leader: 3	Replicas: 3,2	Isr: 3,2

可以看到2個分割槽,2個副本

partiton: partion id 
leader:當前負責讀寫的lead broker id ,就是server.properties的broker.id
replicas:當前partition的所有replication broker  list 
isr:relicas的子集,只包含出於活動狀態的broker