kafka入門

以下內容部分翻譯至 http://kafka.apache.org/intro
kafka介紹
我們認為,一個流處理平臺具有三個關鍵能力:
-
釋出和訂閱訊息(流),在這方面,它類似於一個訊息佇列或企業訊息系統。
-
以
容錯
的方式儲存訊息(流)。 -
在訊息流發生時處理它們。
什麼是kakfa的優勢?
它應用於2大類應用:
-
構建實時的流資料管道,可靠地獲取系統和應用程式之間的資料。
-
構建實時流的應用程式,對資料流進行轉換或反應。
要了解kafka是如何做這些事情的,讓我們從下到上深入探討kafka的能力。
首先幾個概念:
-
kafka作為一個叢集執行在一個或多個伺服器上。
-
kafka叢集儲存的訊息是以topic為類別記錄的。
-
每個訊息(也叫記錄record,我習慣叫訊息)是由一個key,一個value和時間戳構成。
kafka有四個核心API:
-
應用程式使用
Producer API
釋出訊息到1個或多個topic(主題)。 -
應用程式使用
Consumer API
來訂閱一個或多個topic,並處理產生的訊息。 -
應用程式使用
Streams API
充當一個流處理器,從1個或多個topic消費輸入流,並生產一個輸出流到1個或多個輸出topic,有效地將輸入流轉換到輸出流。 -
Connector API
允許構建或執行可重複使用的生產者或消費者,將topic連線到現有的應用程式或資料系統。例如,一個關係資料庫的聯結器可捕獲每一個變化。

image
Client和Server之間的通訊,是通過一條簡單、高效能並且和開發語言無關的 ofollow,noindex">TCP協議 。並且該協議保持與老版本的相容。Kafka提供了Java Client(客戶端)。除了Java Client外,還有非常多的 其它程式語言的Client 。
首先來了解一下Kafka所使用的基本術語:
Topic
Kafka將訊息種子(Feed)分門別類,每一類的訊息稱之為一個主題(Topic).
Producer
釋出訊息的物件稱之為主題生產者(Kafka topic producer)
Consumer
訂閱訊息並處理髮布的訊息的種子的物件稱之為主題消費者(consumers)
Broker
已釋出的訊息儲存在一組伺服器中,稱之為Kafka叢集。叢集中的每一個伺服器都是一個代理(Broker). 消費者可以訂閱一個或多個主題(topic),並從Broker拉資料,從而消費這些已釋出的訊息。
主題和日誌 (Topic和Log)
讓我們更深入的瞭解Kafka中的Topic。
Topic是釋出的訊息的類別或者種子Feed名。對於每一個Topic,Kafka叢集維護這一個分割槽的log,就像下圖中的示例:

image
每一個分割槽都是一個順序的、不可變的訊息佇列, 並且可以持續的新增。分割槽中的訊息都被分了一個序列號,稱之為偏移量(offset),在每個分割槽中此偏移量都是唯一的。
Kafka叢集保持所有的訊息,直到它們過期, 無論訊息是否被消費了。 實際上消費者所持有的僅有的元資料就是這個偏移量,也就是消費者在這個log中的位置。 這個偏移量由消費者控制:正常情況當消費者消費訊息的時候,偏移量也線性的的增加。但是實際偏移量由消費者控制,消費者可以將偏移量重置為更老的一個偏移量,重新讀取訊息。 可以看到這種設計對消費者來說操作自如, 一個消費者的操作不會影響其它消費者對此log的處理。 再說說分割槽。Kafka中採用分割槽的設計有幾個目的。一是可以處理更多的訊息,不受單臺伺服器的限制。Topic擁有多個分割槽意味著它可以不受限的處理更多的資料。第二,分割槽可以作為並行處理的單元,稍後會談到這一點。

image
分散式(Distribution)
Log的分割槽被分佈到叢集中的多個伺服器上。每個伺服器處理它分到的分割槽。 根據配置每個分割槽還可以複製到其它伺服器作為備份容錯。 每個分割槽有一個leader,零或多個follower。Leader處理此分割槽的所有的讀寫請求,而follower被動的複製資料。如果leader宕機,其它的一個follower會被推舉為新的leader。 一臺伺服器可能同時是一個分割槽的leader,另一個分割槽的follower。 這樣可以平衡負載,避免所有的請求都只讓一臺或者某幾臺伺服器處理。
Geo-Replication(異地資料同步技術)
Kafka MirrorMaker為群集提供 geo-replication
支援。藉助 MirrorMaker
,訊息可以跨多個資料中心或雲區域進行復制。 您可以在active/passive場景中用於備份和恢復; 或者在active/passive方案中將資料置於更接近使用者的位置,或資料本地化。
生產者(Producers)
生產者往某個Topic上釋出訊息。生產者也負責選擇釋出到Topic上的哪一個分割槽。最簡單的方式從分割槽列表中輪流選擇。也可以根據某種演算法依照權重選擇分割槽。開發者負責如何選擇分割槽的演算法。
消費者(Consumers)
通常來講,訊息模型可以分為兩種, 佇列和釋出-訂閱式。 佇列的處理方式是 一組消費者從伺服器讀取訊息,一條訊息只有其中的一個消費者來處理。在釋出-訂閱模型中,訊息被廣播給所有的消費者,接收到訊息的消費者都可以處理此訊息。Kafka為這兩種模型提供了單一的消費者抽象模型: 消費者組 (consumer group)。 消費者用一個消費者組名標記自己。 一個釋出在Topic上訊息被分發給此消費者組中的一個消費者。 假如所有的消費者都在一個組中,那麼這就變成了queue模型。 假如所有的消費者都在不同的組中,那麼就完全變成了釋出-訂閱模型。 更通用的, 我們可以建立一些消費者組作為邏輯上的訂閱者。每個組包含數目不等的消費者, 一個組內多個消費者可以用來擴充套件效能和容錯。正如下圖所示:

image
2個kafka叢集託管4個分割槽(P0-P3),2個消費者組,消費組A有2個消費者例項,消費組B有4個。
正像傳統的訊息系統一樣,Kafka保證訊息的順序不變。 再詳細扯幾句。傳統的佇列模型保持訊息,並且保證它們的先後順序不變。但是, 儘管伺服器保證了訊息的順序,訊息還是非同步的傳送給各個消費者,消費者收到訊息的先後順序不能保證了。這也意味著並行消費將不能保證訊息的先後順序。用過傳統的訊息系統的同學肯定清楚,訊息的順序處理很讓人頭痛。如果只讓一個消費者處理訊息,又違背了並行處理的初衷。 在這一點上Kafka做的更好,儘管並沒有完全解決上述問題。 Kafka採用了一種分而治之的策略:分割槽。 因為Topic分割槽中訊息只能由消費者組中的唯一一個消費者處理,所以訊息肯定是按照先後順序進行處理的。但是它也僅僅是保證Topic的一個分割槽順序處理,不能保證跨分割槽的訊息先後處理順序。 所以,如果你想要順序的處理Topic的所有訊息,那就只提供一個分割槽。
Kafka的保證(Guarantees)
-
生產者傳送到一個特定的Topic的分割槽上,訊息將會按照它們傳送的順序依次加入,也就是說,如果一個訊息M1和M2使用相同的producer傳送,M1先發送,那麼M1將比M2的offset低,並且優先的出現在日誌中。
-
消費者收到的訊息也是此順序。
-
如果一個Topic配置了複製因子(replication factor)為N, 那麼可以允許N-1伺服器宕機而不丟失任何已經提交(committed)的訊息。
有關這些保證的更多詳細資訊,請參見文件的設計部分。
kafka作為一個訊息系統
Kafka的流與傳統企業訊息系統相比的概念如何?
傳統的訊息有兩種模式: 佇列
和 釋出訂閱
。 在佇列模式中,消費者池從伺服器讀取訊息(每個訊息只被其中一個讀取); 釋出訂閱模式:訊息廣播給所有的消費者。這兩種模式都有優缺點,佇列的優點是允許多個消費者瓜分處理資料,這樣可以擴充套件處理。但是,佇列不像多個訂閱者,一旦訊息者程序讀取後故障了,那麼訊息就丟了。而 釋出和訂閱
允許你廣播資料到多個消費者,由於每個訂閱者都訂閱了訊息,所以沒辦法縮放處理。
kafka中消費者組有兩個概念: 佇列
:消費者組(consumer group)允許同名的消費者組成員瓜分處理。 釋出訂閱
:允許你廣播訊息給多個消費者組(不同名)。
kafka的每個topic都具有這兩種模式。
kafka有比傳統的訊息系統更強的順序保證。
傳統的訊息系統按順序儲存資料,如果多個消費者從佇列消費,則伺服器按儲存的順序傳送訊息,但是,儘管伺服器按順序傳送,訊息非同步傳遞到消費者,因此訊息可能亂序到達消費者。這意味著訊息存在並行消費的情況,順序就無法保證。訊息系統常常通過僅設1個消費者來解決這個問題,但是這意味著沒用到並行處理。
kafka做的更好。通過並行topic的parition —— kafka提供了順序保證和負載均衡。每個partition僅由同一個消費者組中的一個消費者消費到。並確保消費者是該partition的唯一消費者,並按順序消費資料。每個topic有多個分割槽,則需要對多個消費者做負載均衡,但請注意, 相同的消費者組中不能有比分割槽更多的消費者,否則多出的消費者一直處於空等待,不會收到訊息
。
kafka作為一個儲存系統
所有釋出訊息到 訊息佇列
和消費分離的系統,實際上都充當了一個儲存系統(釋出的訊息先儲存起來)。Kafka比別的系統的優勢是它是一個非常高效能的 儲存系統
。
寫入到kafka的資料將寫到磁碟並複製到叢集中保證容錯性。並允許生產者等待訊息應答,直到訊息完全寫入。
kafka的磁碟結構 - 無論你伺服器上有50KB或50TB,執行是相同的。
client來控制讀取資料的位置。你還可以認為kafka是一種專用於高效能,低延遲,提交日誌儲存,複製,和傳播特殊用途的 分散式檔案系統
。
kafka的流處理
僅僅讀,寫和儲存是不夠的,kafka的目標是實時的流處理。
在kafka中,流處理持續獲取 輸入topic
的資料,進行處理加工,然後寫入 輸出topic
。例如,一個零售APP,接收銷售和出貨的 輸入流
,統計數量或調整價格後輸出。
可以直接使用producer和consumer API進行簡單的處理。對於複雜的轉換,Kafka提供了更強大的Streams API。可構建 聚合計算
或 連線流到一起
的複雜應用程式。
助於解決此類應用面臨的硬性問題:處理無序的資料,程式碼更改的再處理,執行狀態計算等。
Sterams API在Kafka中的核心:使用producer和consumer API作為輸入,利用Kafka做狀態儲存,使用相同的組機制在stream處理器例項之間進行容錯保障。
拼在一起
訊息傳遞,儲存和流處理的組合看似反常,但對於Kafka作為流式處理平臺的作用至關重要。
像HDFS這樣的分散式檔案系統允許儲存靜態檔案來進行批處理。這樣系統可以有效地儲存和處理來自過去的歷史資料。
傳統企業的訊息系統允許在你訂閱之後處理未來的訊息:在未來資料到達時處理它。
Kafka結合了這兩種能力,這種組合對於kafka作為流處理應用和流資料管道平臺是至關重要的。
批處理以及訊息驅動應用程式的流處理的概念:通過組合儲存和低延遲訂閱,流處理應用可以用相同的方式對待過去和未來的資料。它是一個單一的應用程式,它可以處理歷史的儲存資料,當它處理到最後一個訊息時,它進入等待未來的資料到達,而不是結束。
同樣,對於流資料管道(pipeline),訂閱實時事件的組合使得可以將Kafka用於非常低延遲的管道;但是,可靠地儲存資料的能力使得它可以將其用於必須保證傳遞的關鍵資料,或與僅定期載入資料或長時間維護的離線系統整合在一起。流處理可以在資料到達時轉換它。
使用場景
- Messaging
對於一些常規的訊息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴充套件性和效能優勢.不過到目前為止,我們應該很清楚認識到,kafka並沒有提供JMS中的"事務性""訊息傳輸擔保(訊息確認機制)""訊息分組"等企業級特性;kafka只能使用作為"常規"的訊息系統,在一定程度上,尚未確保訊息的傳送與接收絕對可靠(比如,訊息重發,訊息傳送丟失等)
- Websit activity tracking
kafka可以作為"網站活性跟蹤"的最佳工具;可以將網頁/使用者操作等資訊傳送到kafka中.並實時監控,或者離線統計分析等
- Log Aggregation
kafka的特性決定它非常適合作為"日誌收集中心";application可以將操作日誌"批量""非同步"的傳送到kafka叢集中,而不是儲存在本地或者DB中;kafka可以批量提交訊息/壓縮訊息等,這對producer端而言,幾乎感覺不到效能的開支.此時consumer端可以使hadoop等其他系統化的儲存和分析系統.
原理
kafka的設計初衷是希望作為一個統一的資訊收集平臺,能夠實時的收集反饋資訊,並需要能夠支撐較大的資料量,且具備良好的容錯能力.
1、永續性
kafka使用檔案儲存訊息,這就直接決定kafka在效能上嚴重依賴檔案系統的本身特性.且無論任何OS下,對檔案系統本身的優化幾乎沒有可能.檔案快取/直接記憶體對映等是常用的手段.因為kafka是對日誌檔案進行append操作,因此磁碟檢索的開支是較小的;同時為了減少磁碟寫入的次數,broker會將訊息暫時buffer起來,當訊息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO呼叫的次數.
2、效能
需要考慮的影響效能點很多,除磁碟IO之外,我們還需要考慮網路IO,這直接關係到kafka的吞吐量問題.kafka並沒有提供太多高超的技巧;對於producer端,可以將訊息buffer起來,當訊息的條數達到一定閥值時,批量傳送給broker;對於consumer端也是一樣,批量fetch多條訊息.不過訊息量的大小可以通過配置檔案來指定.對於kafka broker端,似乎有個sendfile系統呼叫可以潛在的提升網路IO的效能:將檔案的資料對映到系統記憶體中,socket直接讀取相應的記憶體區域即可,而無需程序再次copy和交換. 其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用訊息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網路IO更應該需要考慮.可以將任何在網路上傳輸的訊息都經過壓縮.kafka支援gzip/snappy等多種壓縮方式.
3、生產者
負載均衡: producer將會和Topic下所有partition leader保持socket連線;訊息由producer直接通過socket傳送到broker,中間不會經過任何"路由層".事實上,訊息被路由到哪個partition上,有producer客戶端決定.比如可以採用"random""key-hash""輪詢"等,如果一個topic中有多個partitions,那麼在producer端實現"訊息均衡分發"是必要的.
其中partition leader的位置(host:port)註冊在zookeeper中,producer作為zookeeper client,已經註冊了watch用來監聽partition leader的變更事件.
非同步傳送:將多條訊息暫且在客戶端buffer起來,並將他們批量的傳送到broker,小資料IO太多,會拖慢整體的網路延遲,批量延遲傳送事實上提升了網路效率。不過這也有一定的隱患,比如說當producer失效時,那些尚未傳送的訊息將會丟失。
4、消費者
consumer端向broker傳送"fetch"請求,並告知其獲取訊息的offset;此後consumer將會獲得一定條數的訊息;consumer端也可以重置offset來重新消費訊息.
在JMS實現中,Topic模型基於push方式,即broker將訊息推送給consumer端.不過在kafka中,採用了pull方式,即consumer在和broker建立連線之後,主動去pull(或者說fetch)訊息;這中模式有些優點,首先consumer端可以根據自己的消費能力適時的去fetch訊息並處理,且可以控制訊息消費的進度(offset);此外,消費者可以良好的控制訊息消費的數量,batch fetch.
kafka安裝和簡單使用(以mac為例)
安裝
# kafka依賴zookeeper 這裡使用brew安裝, 自動會安裝 zookeeper brew install kafka ==> Installing dependencies for kafka: zookeeper ==> Installing kafka dependency: zookeeper ==> Downloading https://homebrew.bintray.com/bottles/zookeeper-3.4.13.high_sierra.bottle.tar.gz ######################################################################## 100.0% ==> Pouring zookeeper-3.4.13.high_sierra.bottle.tar.gz ==> Caveats To have launchd start zookeeper now and restart at login: brew services start zookeeper Or, if you don't want/need a background service you can just run: zkServer start ==> Summary :beer:/usr/local/Cellar/zookeeper/3.4.13: 244 files, 33.4MB ==> Installing kafka ==> Downloading https://homebrew.bintray.com/bottles/kafka-2.0.0.high_sierra.bottle.tar.gz ######################################################################## 100.0% ==> Pouring kafka-2.0.0.high_sierra.bottle.tar.gz ==> Caveats To have launchd start kafka now and restart at login: brew services start kafka Or, if you don't want/need a background service you can just run: zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties ==> Summary :beer:/usr/local/Cellar/kafka/2.0.0: 160 files, 46.8MB ==> Caveats ==> zookeeper To have launchd start zookeeper now and restart at login: brew services start zookeeper Or, if you don't want/need a background service you can just run: zkServer start ==> kafka To have launchd start kafka now and restart at login: brew services start kafka Or, if you don't want/need a background service you can just run: zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
上面可以看出 配置檔案在 /usr/local/etc/kafka 下面</pre>
kafka的檔案目錄
brew list kafka /usr/local/Cellar/kafka/2.0.0/.bottle/etc/ (14 files) /usr/local/Cellar/kafka/2.0.0/bin/connect-distributed /usr/local/Cellar/kafka/2.0.0/bin/connect-standalone /usr/local/Cellar/kafka/2.0.0/bin/kafka-acls /usr/local/Cellar/kafka/2.0.0/bin/kafka-broker-api-versions /usr/local/Cellar/kafka/2.0.0/bin/kafka-configs /usr/local/Cellar/kafka/2.0.0/bin/kafka-console-consumer /usr/local/Cellar/kafka/2.0.0/bin/kafka-console-producer /usr/local/Cellar/kafka/2.0.0/bin/kafka-consumer-groups /usr/local/Cellar/kafka/2.0.0/bin/kafka-consumer-perf-test /usr/local/Cellar/kafka/2.0.0/bin/kafka-delegation-tokens /usr/local/Cellar/kafka/2.0.0/bin/kafka-delete-records /usr/local/Cellar/kafka/2.0.0/bin/kafka-dump-log /usr/local/Cellar/kafka/2.0.0/bin/kafka-log-dirs /usr/local/Cellar/kafka/2.0.0/bin/kafka-mirror-maker /usr/local/Cellar/kafka/2.0.0/bin/kafka-preferred-replica-election /usr/local/Cellar/kafka/2.0.0/bin/kafka-producer-perf-test /usr/local/Cellar/kafka/2.0.0/bin/kafka-reassign-partitions /usr/local/Cellar/kafka/2.0.0/bin/kafka-replica-verification /usr/local/Cellar/kafka/2.0.0/bin/kafka-run-class /usr/local/Cellar/kafka/2.0.0/bin/kafka-server-start /usr/local/Cellar/kafka/2.0.0/bin/kafka-server-stop /usr/local/Cellar/kafka/2.0.0/bin/kafka-streams-application-reset /usr/local/Cellar/kafka/2.0.0/bin/kafka-topics /usr/local/Cellar/kafka/2.0.0/bin/kafka-verifiable-consumer /usr/local/Cellar/kafka/2.0.0/bin/kafka-verifiable-producer /usr/local/Cellar/kafka/2.0.0/bin/trogdor /usr/local/Cellar/kafka/2.0.0/bin/zookeeper-security-migration /usr/local/Cellar/kafka/2.0.0/bin/zookeeper-server-start /usr/local/Cellar/kafka/2.0.0/bin/zookeeper-server-stop /usr/local/Cellar/kafka/2.0.0/bin/zookeeper-shell /usr/local/Cellar/kafka/2.0.0/homebrew.mxcl.kafka.plist /usr/local/Cellar/kafka/2.0.0/libexec/bin/ (30 files) /usr/local/Cellar/kafka/2.0.0/libexec/libs/ (81 files)
brew將kafka安裝在/usr/local/Cellar/kafka/2.0.0下
另外 kafka的配置檔案 在 /usr/local/etc/kafka下面,配置檔案也可以在其他地方,啟動的時候指定就可以了。
我們看下配置:/usr/local/Cellar/kafka/2.0.0 檔案
tree -L 2 . ├── INSTALL_RECEIPT.json ├── LICENSE ├── NOTICE ├── bin │├── connect-distributed │├── connect-standalone │├── kafka-acls │├── kafka-broker-api-versions │├── kafka-configs │├── kafka-console-consumer │├── kafka-console-producer │├── kafka-consumer-groups │├── kafka-consumer-perf-test │├── kafka-delegation-tokens │├── kafka-delete-records │├── kafka-dump-log │├── kafka-log-dirs │├── kafka-mirror-maker │├── kafka-preferred-replica-election │├── kafka-producer-perf-test │├── kafka-reassign-partitions │├── kafka-replica-verification │├── kafka-run-class │├── kafka-server-start │├── kafka-server-stop │├── kafka-streams-application-reset │├── kafka-topics │├── kafka-verifiable-consumer │├── kafka-verifiable-producer │├── trogdor │├── zookeeper-security-migration │├── zookeeper-server-start │├── zookeeper-server-stop │└── zookeeper-shell ├── homebrew.mxcl.kafka.plist └── libexec ├── bin ├── config -> ../../../../etc/kafka └── libs
bin: 程式的檔案
libexec: 執行的檔案、庫等, 執行的很多都是在這裡面
下面介紹一下 bin子目錄;
bin主要是 用於啟動的sh指令碼
/usr/local/Cellar/kafka/2.0.0/libexec/bin ll -a | awk '{print $9}' . .. connect-distributed.sh connect-standalone.sh kafka-acls.sh kafka-broker-api-versions.sh kafka-configs.sh kafka-console-consumer.sh kafka-console-producer.sh kafka-consumer-groups.sh kafka-consumer-perf-test.sh kafka-delegation-tokens.sh kafka-delete-records.sh kafka-dump-log.sh kafka-log-dirs.sh kafka-mirror-maker.sh kafka-preferred-replica-election.sh kafka-producer-perf-test.sh kafka-reassign-partitions.sh kafka-replica-verification.sh kafka-run-class.sh kafka-server-start.sh kafka-server-stop.sh kafka-streams-application-reset.sh kafka-topics.sh kafka-verifiable-consumer.sh kafka-verifiable-producer.sh trogdor.sh zookeeper-security-migration.sh zookeeper-server-start.sh zookeeper-server-stop.sh zookeeper-shell.sh
看一下 kafka-server-start.sh
#!/bin/bash # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements.See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License.You may obtain a copy of the License at # #http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. if [ $# -lt 1 ]; then echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1 fi base_dir=$(dirname $0) if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties" fi if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'} COMMAND=$1 case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;; esac exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
上面就是判斷 一下引數,然後呼叫 kafka-run-class.sh 加一些引數執行,這裡就不介紹了
配置
zookeeper配置
vim /usr/local/etc/kafka/zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements.See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License.You may obtain a copy of the License at # #http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # the directory where the snapshot is stored. dataDir=/usr/local/var/lib/zookeeper # 資料目錄 # the port at which the clients will connect clientPort=2181 # 埠, 在kafka 中需要配置到這個埠上 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 # 最大連線數 ``` ### kafka伺服器配置 vim /usr/local/etc/kafka/ ```bash 考慮到配置說明太多, cat /usr/local/etc/kafka/server.properties | grep "^#" -v |grep -v "^$" broker.id=0 # 主要是 配置不同的節點伺服器,叢集中唯一標識 # 下面是一些效能引數 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # 資料日誌儲存位置, kafka通過日誌檔案進行持久化的 log.dirs=/usr/local/var/lib/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000
這裡統一叢集中的,連線相同的zookeeper
zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 另外一個比較重要的是: 服務開啟的監聽的埠 listeners=PLAINTEXT://:9092</pre>
修改配置 啟動 3個broker
在一臺機器上啟動三個broker監聽在不同埠上, 另外啟動zookeeper, 使用 /usr/local/Cellar/kafka/2.0.0/libexec/bin/
下的指令碼啟動
先啟動zookeeper
/usr/local/Cellar/kafka/2.0.0/libexec/bin/zookeeper-server-start.sh /usr/local/etc/kafka/zookeeper.properties
zookeeper-server-start.sh 後面跟配置檔案 即可
複製配置
cd /usr/local/etc/kafka/ cp server.properties server-00.properties cp server.properties server-01.properties cp server.properties server-02.properties # 修改配置 vim server-00.properties broker.id=0 listeners=PLAINTEXT://:9092 log.dirs=/usr/local/var/lib/kafka-logs-00 # 主要是修改上面的內容, 修改完 三個配置 # 啟動broker /usr/local/Cellar/kafka/2.0.0/libexec/bin/kafka-server-start.sh /usr/local/etc/kafka/server-00.properties # 第二個 /usr/local/Cellar/kafka/2.0.0/libexec/bin/kafka-server-start.sh /usr/local/etc/kafka/server-01.properties # 第三個 /usr/local/Cellar/kafka/2.0.0/libexec/bin/kafka-server-start.sh /usr/local/etc/kafka/server-02.properties</pre> ### 簡單使用 # 建立topic cd /usr/local/Cellar/kafka/2.0.0/libexec/bin/ ./kafka-topics.sh --help # 檢視使用 ./kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 3 # 檢視topic的資訊 ./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test Topic:testPartitionCount:3ReplicationFactor:3Configs: Topic: testPartition: 0Leader: 0Replicas: 0,1,2Isr: 0,1,2 Topic: testPartition: 1Leader: 1Replicas: 1,2,0Isr: 1,2,0 Topic: testPartition: 2Leader: 2Replicas: 2,0,1Isr: 2,0,1 # 列出所有的 topic ./kafka-topics.sh --zookeeper localhost:2181 --list # 下面使用kafka的cli的方式演示生產者和消費者 ./kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test # 上面消費者就啟動起來了 # 啟動生產者, > 後面輸入訊息, 在消費者就可以接受到 ./kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test ~ ./kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test >hello >world >kafka hello world ~./kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test hello world kafka hello world </pre> ### kafka資料日誌 ### 配置在log.dirs=/usr/local/var/lib/kafka-logs-00 ~ cd /usr/local/var/lib/ ~ls kafka-logs-00 kafka-logs-01 kafka-logs-02 zookeeper . ├── __consumer_offsets-0 ├── __consumer_offsets-12 ├── __consumer_offsets-15 ├── __consumer_offsets-18 ├── __consumer_offsets-21 ├── __consumer_offsets-24 ├── __consumer_offsets-27 ├── __consumer_offsets-3 ├── __consumer_offsets-30 ├── __consumer_offsets-33 ├── __consumer_offsets-36 ├── __consumer_offsets-39 ├── __consumer_offsets-42 ├── __consumer_offsets-45 ├── __consumer_offsets-48 ├── __consumer_offsets-6 ├── __consumer_offsets-9 ├── cleaner-offset-checkpoint ├── log-start-offset-checkpoint ├── meta.properties ├── recovery-point-offset-checkpoint ├── replication-offset-checkpoint ├── test-0 ├── test-1 └── test-2 # 這裡的檔案對應不同的partition, offset等。</pre> 本文只是介紹搭建kafka, 具體接入到具體專案中使用,後面的文章後更新補充上。