1. 程式人生 > >kafka實戰教程(python操作kafka),kafka配置檔案詳解

kafka實戰教程(python操作kafka),kafka配置檔案詳解

全棧工程師開發手冊 (作者:欒鵬)

kafka介紹

1.1. 主要功能

根據官網的介紹,ApacheKafka®是一個分散式流媒體平臺,它主要有3種功能:

1:It lets you publish and subscribe to streams of records.釋出和訂閱訊息流,這個功能類似於訊息佇列,這也是kafka歸類為訊息佇列框架的原因

2:It lets you store streams of records in a fault-tolerant way.以容錯的方式記錄訊息流,kafka以檔案的方式來儲存訊息流

3:It lets you process streams of records as they occur.可以再訊息釋出的時候進行處理

1.2. 使用場景

1:Building real-time streaming data pipelines that reliably get data between systems or applications.在系統或應用程式之間構建可靠的用於傳輸實時資料的管道,訊息佇列功能

2:Building real-time streaming applications that transform or react to the streams of data。構建實時的流資料處理程式來變換或處理資料流,資料處理功能

1.3. 詳細介紹

Kafka目前主要作為一個分散式的釋出訂閱式的訊息系統使用,下面簡單介紹一下kafka的基本機制

1.3.1 訊息傳輸流程

這裡寫圖片描述

Producer即生產者,向Kafka叢集傳送訊息,在傳送訊息之前,會對訊息進行分類,即Topic,上圖展示了兩個producer傳送了分類為topic1的訊息,另外一個傳送了topic2的訊息。

Topic即主題,通過對訊息指定主題可以將訊息分類,消費者可以只關注自己需要的Topic中的訊息

Consumer即消費者,消費者通過與kafka叢集建立長連線的方式,不斷地從叢集中拉取訊息,然後可以對這些訊息進行處理。

從上圖中就可以看出同一個Topic下的消費者和生產者的數量並不是對應的。

1.3.2 kafka伺服器訊息儲存策略

這裡寫圖片描述

談到kafka的儲存,就不得不提到分割槽,即partitions,建立一個topic時,同時可以指定分割槽數目,分割槽數越多,其吞吐量也越大,但是需要的資源也越多,同時也會導致更高的不可用性,kafka在接收到生產者傳送的訊息之後,會根據均衡策略將訊息儲存到不同的分割槽中。

這裡寫圖片描述

在每個分割槽中,訊息以順序儲存,最晚接收的的訊息會最後被消費。

1.3.3 與生產者的互動

這裡寫圖片描述

生產者在向kafka叢集傳送訊息的時候,可以通過指定分割槽來發送到指定的分割槽中

也可以通過指定均衡策略來將訊息傳送到不同的分割槽中

如果不指定,就會採用預設的隨機均衡策略,將訊息隨機的儲存到不同的分割槽中

1.3.4 與消費者的互動

這裡寫圖片描述

在消費者消費訊息時,kafka使用offset來記錄當前消費的位置

在kafka的設計中,可以有多個不同的group來同時消費同一個topic下的訊息,如圖,我們有兩個不同的group同時消費,他們的的消費的記錄位置offset各不專案,不互相干擾。

對於一個group而言,消費者的數量不應該多餘分割槽的數量,因為在一個group中,每個分割槽至多隻能繫結到一個消費者上,即一個消費者可以消費多個分割槽,一個分割槽只能給一個消費者消費

因此,若一個group中的消費者數量大於分割槽數量的話,多餘的消費者將不會收到任何訊息。

Kafka安裝與使用

2.1. 下載

2.2. 安裝

Kafka是使用scala編寫的執行與jvm虛擬機器上的程式,雖然也可以在windows上使用,但是kafka基本上是執行在linux伺服器上,因此我們這裡也使用linux來開始今天的實戰。

首先確保你的機器上安裝了jdk,kafka需要java執行環境,以前的kafka還需要zookeeper,新版的kafka已經內建了一個zookeeper環境,所以我們可以直接使用

說是安裝,如果只需要進行最簡單的嘗試的話我們只需要解壓到任意目錄即可,這裡我們將kafka壓縮包解壓到/home目錄

2.3. 配置

在kafka解壓目錄下下有一個config的資料夾,裡面放置的是我們的配置檔案

consumer.properites 消費者配置,這個配置檔案用於配置於2.5節中開啟的消費者,此處我們使用預設的即可

producer.properties 生產者配置,這個配置檔案用於配置於2.5節中開啟的生產者,此處我們使用預設的即可

server.properties kafka伺服器的配置,此配置檔案用來配置kafka伺服器,目前僅介紹幾個最基礎的配置

  • broker.id 申明當前kafka伺服器在叢集中的唯一ID,需配置為integer,並且叢集中的每一個kafka伺服器的id都應是唯一的,我們這裡採用預設配置即可

  • listeners 申明此kafka伺服器需要監聽的埠號,如果是在本機上跑虛擬機器執行可以不用配置本項,預設會使用localhost的地址,如果是在遠端伺服器上執行則必須配置,例如:listeners=PLAINTEXT://192.168.180.128:9092。並確保伺服器的9092埠能夠訪問

  • zookeeper.connect 申明kafka所連線的zookeeper的地址 ,需配置為zookeeper的地址,由於本次使用的是kafka高版本中自帶zookeeper,使用預設配置即可
    zookeeper.connect=localhost:2181

當我們有多個應用,在不同的應用中都使用zookeer,都使用預設的zk埠的話就會2181埠衝突,我們可以設定自己的埠號,在config資料夾下zookeeper.properties檔案中修改為

clientPort=2185

也就是zk開放介面為2185.

同時修改kafka的接入埠,server.properties檔案中修改為

zookeeper.connect=localhost:2185

這樣我們就成功修改了kafka裡面的埠號

2.4. 執行

啟動zookeeper

cd進入kafka解壓目錄,輸入

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動zookeeper成功後會看到如下的輸出

這裡寫圖片描述

2.啟動kafka

cd進入kafka解壓目錄,輸入

bin/kafka-server-start.sh config/server.properties

啟動kafka成功後會看到如下的輸出

這裡寫圖片描述

2.5. 第一個訊息

2.5.1 建立一個topic

Kafka通過topic對同一類的資料進行管理,同一類的資料使用同一個topic可以在處理資料時更加的便捷

在kafka解壓目錄開啟終端,輸入

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

建立一個名為test的topic

這裡寫圖片描述

在建立topic後可以通過輸入

bin/kafka-topics.sh --list --zookeeper localhost:2181

來檢視已經建立的topic

2.5.2 建立一個訊息消費者

在kafka解壓目錄開啟終端,輸入

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

可以建立一個用於消費topic為test的消費者

這裡寫圖片描述

消費者建立完成之後,因為還沒有傳送任何資料,因此這裡在執行後沒有打印出任何資料

不過彆著急,不要關閉這個終端,開啟一個新的終端,接下來我們建立第一個訊息生產者

2.5.3 建立一個訊息生產者

在kafka解壓目錄開啟一個新的終端,輸入

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在執行完畢後會進入的編輯器頁面

這裡寫圖片描述

在傳送完訊息之後,可以回到我們的訊息消費者終端中,可以看到,終端中已經打印出了我們剛才傳送的訊息

這裡寫圖片描述

kafka清理資料和topic

1、刪除kafka儲存目錄(server.properties檔案log.dirs配置,預設為"/tmp/kafka-logs")相關topic目錄

2、Kafka 刪除topic的命令是:

./bin/kafka-topics  --delete --zookeeper 【zookeeper server】  --topic 【topic name】

如果kafaka啟動時載入的配置檔案中server.properties沒有配置delete.topic.enable=true,那麼此時的刪除並不是真正的刪除,而是把topic標記為:marked for deletion

你可以通過命令:

./bin/kafka-topics --zookeeper 【zookeeper server】 --list 來檢視所有topic

此時你若想真正刪除它,可以如下操作:

(1)登入zookeeper客戶端:命令:./bin/zookeeper-client

(2)找到topic所在的目錄:ls /brokers/topics

(3)找到要刪除的topic,執行命令:rmr /brokers/topics/【topic name】即可,此時topic被徹底刪除。

另外被標記為marked for deletion的topic你可以在zookeeper客戶端中通過命令獲得:ls /admin/delete_topics/【topic name】

如果你刪除了此處的topic,那麼marked for deletion 標記消失

zookeeper 的config中也有有關topic的資訊: ls /config/topics/【topic name】暫時不知道有什麼用

總結:

徹底刪除topic:

1、刪除kafka儲存目錄(server.properties檔案log.dirs配置,預設為"/tmp/kafka-logs")相關topic目錄

2、如果配置了delete.topic.enable=true直接通過命令刪除,如果命令刪除不掉,直接通過zookeeper-client 刪除掉broker下的topic即可。

python操作kafka

我們已經知道了kafka是一個訊息佇列,下面我們來學習怎麼向kafka中傳遞資料和如何從kafka中獲取資料

首先安裝python的kafka庫

pip install kafka

按照官網的樣例,先跑一個應用

1、生產者:

from kafka import KafkaProducer
import time
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])  #此處ip可以是多個['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]


i=0
while True:
    i+=1
    msg = "producer1+%d" % i
    print(msg)
    producer.send('test', msg.encode('utf-8'))  # 引數為主題和bytes資料
    time.sleep(1)

producer.close()

啟動後生產者便可以將位元組流傳送到kafka伺服器.

2、消費者(簡單demo):

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])  #引數為接收主題和kafka伺服器地址

# 這是一個永久堵塞的過程,生產者訊息會快取在訊息佇列中,並且不刪除,所以每個訊息在訊息佇列中都有偏移
for message in consumer:  # consumer是一個訊息佇列,當後臺有訊息時,這個訊息佇列就會自動增加.所以遍歷也總是會有資料,當訊息佇列中沒有資料時,就會堵塞等待訊息帶來
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))


啟動後消費者可以從kafka伺服器獲取資料.

3、消費者(消費群組)

from kafka import KafkaConsumer
# 使用group,對於同一個group的成員只有一個消費者例項可以讀取資料
consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

啟動多個消費者,只有其中某一個成員可以消費到,滿足要求,消費組可以橫向擴充套件提高處理能力

4、消費者(讀取目前最早可讀的訊息)

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['127.0.0.1:9092'])

for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

auto_offset_reset:重置偏移量,earliest移到最早的可用訊息,latest最新的訊息,預設為latest
原始碼定義:{‘smallest’: ‘earliest’, ‘largest’: ‘latest’}

5、消費者(手動設定偏移量)

# ==========讀取指定位置訊息===============
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])

print(consumer.partitions_for_topic("test"))  #獲取test主題的分割槽資訊
print(consumer.topics())  #獲取主題列表
print(consumer.subscription())  #獲取當前消費者訂閱的主題
print(consumer.assignment())  #獲取當前消費者topic、分割槽資訊
print(consumer.beginning_offsets(consumer.assignment())) #獲取當前消費者可消費的偏移量
consumer.seek(TopicPartition(topic='test', partition=0), 5)  #重置偏移量,從第5個偏移量消費
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))


6、消費者(訂閱多個主題)

# =======訂閱多個消費者==========

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))  #訂閱要消費的主題
print(consumer.topics())
print(consumer.position(TopicPartition(topic='test', partition=0))) #獲取當前主題的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))


7、消費者(手動拉取訊息)

from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
    msg = consumer.poll(timeout_ms=5)   #從kafka獲取訊息
    print(msg)
    time.sleep(2)

8、消費者(訊息掛起與恢復)

# ==============訊息恢復和掛起===========

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))  # pause執行後,consumer不能讀取,直到呼叫resume後恢復。
num = 0
while True:
    print(num)
    print(consumer.paused())   #獲取當前掛起的消費者
    msg = consumer.poll(timeout_ms=5)
    print(msg)
    time.sleep(2)
    num = num + 1
    if num == 10:
        print("resume...")
        consumer.resume(TopicPartition(topic='test', partition=0))
        print("resume......")

pause執行後,consumer不能讀取,直到呼叫resume後恢復。

kafka的配置

在kafka/config/目錄下面有3個配置檔案:

producer.properties

consumer.properties

server.properties

kafka的配置分為 broker(server.properties)、producter(producer.properties)、consumer(consumer.properties)三個不同的配置

一 BROKER 的全域性配置

最為核心的三個配置 broker.id、log.dir、zookeeper.connect 。



------------------------------------------- 系統 相關 -------------------------------------------
##每一個broker在叢集中的唯一標示,要求是正數。在改變IP地址,不改變broker.id的話不會影響consumers
broker.id =1
 
##kafka資料的存放地址,多個地址的話用逗號分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2
log.dirs = /tmp/kafka-logs
 
##提供給客戶端響應的埠
port =6667
 
##訊息體的最大大小,單位是位元組
message.max.bytes =1000000
 
## broker 處理訊息的最大執行緒數,一般情況下不需要去修改
num.network.threads =3
 
## broker處理磁碟IO 的執行緒數 ,數值應該大於你的硬碟數
num.io.threads =8
 
## 一些後臺任務處理的執行緒數,例如過期訊息檔案的刪除等,一般情況下不需要去做修改
background.threads =4
 
## 等待IO執行緒處理的請求佇列最大數,若是等待IO的請求超過這個數值,那麼會停止接受外部訊息,算是一種自我保護機制
queued.max.requests =500
 
##broker的主機地址,若是設定了,那麼會繫結到這個地址上,若是沒有,會繫結到所有的介面上,並將其中之一發送到ZK,一般不設定
host.name
 
## 打廣告的地址,若是設定的話,會提供給producers, consumers,其他broker連線,具體如何使用還未深究
advertised.host.name
 
## 廣告地址埠,必須不同於port中的設定
advertised.port
 
## socket的傳送緩衝區,socket的調優引數SO_SNDBUFF
socket.send.buffer.bytes =100*1024
 
## socket的接受緩衝區,socket的調優引數SO_RCVBUFF
socket.receive.buffer.bytes =100*1024
 
## socket請求的最大數值,防止serverOOM,message.max.bytes必然要小於socket.request.max.bytes,會被topic建立時的指定引數覆蓋
socket.request.max.bytes =100*1024*1024
 
------------------------------------------- LOG 相關 -------------------------------------------
## topic的分割槽是以一堆segment檔案儲存的,這個控制每個segment的大小,會被topic建立時的指定引數覆蓋
log.segment.bytes =1024*1024*1024
 
## 這個引數會在日誌segment沒有達到log.segment.bytes設定的大小,也會強制新建一個segment 會被 topic建立時的指定引數覆蓋
log.roll.hours =24*7
 
## 日誌清理策略 選擇有:delete和compact 主要針對過期資料的處理,或是日誌檔案達到限制的額度,會被 topic建立時的指定引數覆蓋
log.cleanup.policy = delete
 
## 資料儲存的最大時間 超過這個時間 會根據log.cleanup.policy設定的策略處理資料,也就是消費端能夠多久去消費資料
## log.retention.bytes和log.retention.minutes任意一個達到要求,都會執行刪除,會被topic建立時的指定引數覆蓋
log.retention.minutes=7days

指定日誌每隔多久檢檢視是否可以被刪除,預設1分鐘
log.cleanup.interval.mins=1
 
## topic每個分割槽的最大檔案大小,一個topic的大小限制 = 分割槽數*log.retention.bytes 。-1沒有大小限制
## log.retention.bytes和log.retention.minutes任意一個達到要求,都會執行刪除,會被topic建立時的指定引數覆蓋
log.retention.bytes=-1
 
## 檔案大小檢查的週期時間,是否處罰 log.cleanup.policy中設定的策略
log.retention.check.interval.ms=5minutes
 
## 是否開啟日誌壓縮
log.cleaner.enable=false
 
## 日誌壓縮執行的執行緒數
log.cleaner.threads =1
 
## 日誌壓縮時候處理的最大大小
log.cleaner.io.max.bytes.per.second=None
 
## 日誌壓縮去重時候的快取空間 ,在空間允許的情況下,越大越好
log.cleaner.dedupe.buffer.size=500*1024*1024
 
## 日誌清理時候用到的IO塊大小 一般不需要修改
log.cleaner.io.buffer.size=512*1024
 
## 日誌清理中hash表的擴大因子 一般不需要修改
log.cleaner.io.buffer.load.factor =0.9
 
## 檢查是否處罰日誌清理的間隔
log.cleaner.backoff.ms =15000
 
## 日誌清理的頻率控制,越大意味著更高效的清理,同時會存在一些空間上的浪費,會被topic建立時的指定引數覆蓋
log.cleaner.min.cleanable.ratio=0.5
 
## 對於壓縮的日誌保留的最長時間,也是客戶端消費訊息的最長時間,同log.retention.minutes的區別在於一個控制未壓縮資料,一個控制壓縮後的資料。會被topic建立時的指定引數覆蓋
log.cleaner.delete.retention.ms =1day
 
## 對於segment日誌的索引檔案大小限制,會被topic建立時的指定引數覆蓋
log.index.size.max.bytes =10*1024*1024
 
## 當執行一個fetch操作後,需要一定的空間來掃描最近的offset大小,設定越大,代表掃描速度越快,但是也更好記憶體,一般情況下不需要搭理這個引數
log.index.interval.bytes =4096
 
## log檔案"sync"到磁碟之前累積的訊息條數
## 因為磁碟IO操作是一個慢操作,但又是一個"資料可靠性"的必要手段
## 所以此引數的設定,需要在"資料可靠性"與"效能"之間做必要的權衡.
## 如果此值過大,將會導致每次"fsync"的時間較長(IO阻塞)
## 如果此值過小,將會導致"fsync"的次數較多,這也意味著整體的client請求有一定的延遲.
## 物理server故障,將會導致沒有fsync的訊息丟失.
log.flush.interval.messages=None
 
## 檢查是否需要固化到硬碟的時間間隔
log.flush.scheduler.interval.ms =3000
 
## 僅僅通過interval來控制訊息的磁碟寫入時機,是不足的.
## 此引數用於控制"fsync"的時間間隔,如果訊息量始終沒有達到閥值,但是離上一次磁碟同步的時間間隔
## 達到閥值,也將觸發.
log.flush.interval.ms = None
 
## 檔案在索引中清除後保留的時間 一般不需要去修改
log.delete.delay.ms =60000
 
## 控制上次固化硬碟的時間點,以便於資料恢復 一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000
 
------------------------------------------- TOPIC 相關 -------------------------------------------
## 是否允許自動建立topic ,若是false,就需要通過命令建立topic
auto.create.topics.enable =true
 
## 一個topic ,預設分割槽的replication個數 ,不得大於叢集中broker的個數
default.replication.factor =1
 
## 每個topic的分割槽個數,若是在topic建立時候沒有指定的話 會被topic建立時的指定引數覆蓋
num.partitions =1
 
例項 --replication-factor3--partitions1--topic replicated-topic :名稱replicated-topic有一個分割槽,分割槽被複制到三個broker上。
 
----------------------------------複製(Leader、replicas) 相關 ----------------------------------
## partition leader與replicas之間通訊時,socket的超時時間
controller.socket.timeout.ms =30000
 
## partition leader與replicas資料同步時,訊息的佇列尺寸
controller.message.queue.size=10
 
## replicas響應partition leader的最長等待時間,若是超過這個時間,就將replicas列入ISR(in-sync replicas),並認為它是死的,不會再加入管理中
replica.lag.time.max.ms =10000
 
## 如果follower落後與leader太多,將會認為此follower[或者說partition relicas]已經失效
## 通常,在follower與leader通訊時,因為網路延遲或者連結斷開,總會導致replicas中訊息同步滯後
## 如果訊息之後太多,leader將認為此follower網路延遲較大或者訊息吞吐能力有限,將會把此replicas遷移
## 到其他follower中.
## 在broker數量較少,或者網路不足的環境中,建議提高此值.
replica.lag.max.messages =4000
 
##follower與leader之間的socket超時時間
replica.socket.timeout.ms=30*1000
 
## leader複製時候的socket快取大小
replica.socket.receive.buffer.bytes=64*1024
 
## replicas每次獲取資料的最大大小
replica.fetch.max.bytes =1024*1024
 
## replicas同leader之間通訊的最大等待時間,失敗了會重試
replica.fetch.wait.max.ms =500
 
## fetch的最小資料尺寸,如果leader中尚未同步的資料不足此值,將會阻塞,直到滿足條件
replica.fetch.min.bytes =1
 
## leader 進行復制的執行緒數,增大這個數值會增加follower的IO
num.replica.fetchers=1
 
## 每個replica檢查是否將最高水位進行固化的頻率
replica.high.watermark.checkpoint.interval.ms =5000
 
## 是否允許控制器關閉broker ,若是設定為true,會關閉所有在這個broker上的leader,並轉移到其他broker
controlled.shutdown.enable =false
 
## 控制器關閉的嘗試次數
controlled.shutdown.max.retries =3
 
## 每次關閉嘗試的時間間隔
controlled.shutdown.retry.backoff.ms =5000
 
## 是否自動平衡broker之間的分配策略
auto.leader.rebalance.enable =false
 
## leader的不平衡比例,若是超過這個數值,會對分割槽進行重新的平衡
leader.imbalance.per.broker.percentage =10
 
## 檢查leader是否不平衡的時間間隔
leader.imbalance.check.interval.seconds =300
 
## 客戶端保留offset資訊的最大空間大小
offset.metadata.max.bytes
 
----------------------------------ZooKeeper 相關----------------------------------
##zookeeper叢集的地址,可以是多個,多個之間用逗號分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect = localhost:2181
 
## ZooKeeper的最大超時時間,就是心跳的間隔,若是沒有反映,那麼認為已經死了,不易過大
zookeeper.session.timeout.ms=6000
 
## ZooKeeper的連線超時時間
zookeeper.connection.timeout.ms =6000
 
## ZooKeeper叢集中leader和follower之間的同步實際那
zookeeper.sync.time.ms =2000
配置的修改
其中一部分配置是可以被每個topic自身的配置所代替,例如
新增配置
bin/kafka-topics.sh --zookeeper localhost:2181--create --topic my-topic --partitions1--replication-factor1--config max.message.bytes=64000--config flush.messages=1
 
修改配置
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --config max.message.bytes=128000
 
刪除配置 :
bin/kafka-topics.sh --zookeeper localhost:2181--alter --topic my-topic --deleteConfig max.message.bytes


二 CONSUMER 配置


## Consumer歸屬的組ID,broker是根據group.id來判斷是佇列模式還是釋出訂閱模式,非常重要
 group.id
 
## 消費者的ID,若是沒有設定的話,會自增
 consumer.id
 
## 一個用於跟蹤調查的ID ,最好同group.id相同
 client.id = group id value
 
## 對於zookeeper叢集的指定,可以是多個 hostname1:port1,hostname2:port2,hostname3:port3 必須和broker使用同樣的zk配置
 zookeeper.connect=localhost:2182
 
## zookeeper的心跳超時時間,查過這個時間就認為是dead消費者
 zookeeper.session.timeout.ms =6000
 
## zookeeper的等待連線時間
 zookeeper.connection.timeout.ms =6000
 
## zookeeper的follower同leader的同步時間
 zookeeper.sync.time.ms =2000
 
## 當zookeeper中沒有初始的offset時候的處理方式 。smallest :重置為最小值 largest:重置為最大值 anythingelse:丟擲異常
 auto.offset.reset = largest
 
## socket的超時時間,實際的超時時間是:max.fetch.wait + socket.timeout.ms.
 socket.timeout.ms=30*1000
 
## socket的接受快取空間大小
 socket.receive.buffer.bytes=64*1024
 
##從每個分割槽獲取的訊息大小限制
 fetch.message.max.bytes =1024*1024
 
## 是否在消費訊息後將offset同步到zookeeper,當Consumer失敗後就能從zookeeper獲取最新的offset
 auto.commit.enable =true
 
## 自動提交的時間間隔
 auto.commit.interval.ms =60*1000
 
## 用來處理消費訊息的塊,每個塊可以等同於fetch.message.max.bytes中數值
 queued.max.message.chunks =10
 
## 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新
## 的consumer上,如果一個consumer獲得了某個partition的消費許可權,那麼它將會向zk註冊
##"Partition Owner registry"節點資訊,但是有可能此時舊的consumer尚沒有釋放此節點,
## 此值用於控制,註冊節點的重試次數.
 rebalance.max.retries =4
 
## 每次再平衡的時間間隔
 rebalance.backoff.ms =2000
 
## 每次重新選舉leader的時間
 refresh.leader.backoff.ms
 
## server傳送到消費端的最小資料,若是不滿足這個數值則會等待,知道滿足數值要求
 fetch.min.bytes =1
 
## 若是不滿足最小大小(fetch.min.bytes)的話,等待消費端請求的最長等待時間
 fetch.wait.max.ms =100
 
## 指定時間內沒有訊息到達就丟擲異常,一般不需要改
 consumer.timeout.ms = -1

三 PRODUCER 的配置

比較核心的配置:metadata.broker.list、request.required.acks、producer.type、serializer.class

## 消費者獲取訊息元資訊(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面設定一個vip
 metadata.broker.list
 
##訊息的確認模式
 ##0:不保證訊息的到達確認,只管傳送,低延遲但是會出現訊息的丟失,在某個server失敗的情況下,有點像TCP
 ##1:傳送訊息,並會等待leader 收到確認後,一定的可靠性
 ## -1:傳送訊息,等待leader收到確認,並進行復制操作後,才返回,最高的可靠性
 request.required.acks =0
 
## 訊息傳送的最長等待時間
 request.timeout.ms =10000
 
## socket的快取大小
 send.buffer.bytes=100*1024
 
## key的序列化方式,若是沒有設定,同serializer.class
 key.serializer.class
 
## 分割槽的策略,預設是取模
 partitioner.class=kafka.producer.DefaultPartitioner
 
## 訊息的壓縮模式,預設是none,可以有gzip和snappy
 compression.codec = none
 
## 可以針對默寫特定的topic進行壓縮
 compressed.topics=null
 
## 訊息傳送失敗後的重試次數
 message.send.max.retries =3
 
## 每次失敗後的間隔時間
 retry.backoff.ms =100
 
## 生產者定時更新topic元資訊的時間間隔 ,若是設定為0,那麼會在每個訊息傳送後都去更新資料
 topic.metadata.refresh.interval.ms =600*1000
 
## 使用者隨意指定,但是不能重複,主要用於跟蹤記錄訊息
 client.id=""
 
------------------------------------------- 訊息模式 相關 -------------------------------------------
 ## 生產者的型別 async:非同步執行訊息的傳送 sync:同步執行訊息的傳送
 producer.type=sync
 
## 非同步模式下,那麼就會在設定的時間快取訊息,並一次性發送
 queue.buffering.max.ms =5000
 
## 非同步的模式下 最長等待的訊息數
 queue.buffering.max.messages =10000
 
## 非同步模式下,進入佇列的等待時間 若是設定為0,那麼要麼進入佇列,要麼直接拋棄
 queue.enqueue.timeout.ms = -1
 
## 非同步模式下,每次傳送的最大訊息數,前提是觸發了queue.buffering.max.messages或是queue.buffering.max.ms的限制
 batch.num.messages=200
 
## 訊息體的系列化處理類 ,轉化為位元組流進行傳輸
 serializer.class= kafka.serializer.DefaultEncoder