1. 程式人生 > >Kafka叢集配置---Windows版

Kafka叢集配置---Windows版

Kafka叢集配置---Windows版

 

Kafka是一種高吞吐量的分散式釋出訂閱的訊息佇列系統,Kafka對訊息進行儲存時是通過tipic進行分組的。今天我們僅實現Kafka叢集的配置。

前言

最近研究kafka,發現網上很多關於kafka的介紹都是基於Linux作業系統的。雖然這些服務最後都是配置Linux上的。但是我們平時使用的大多都是Windows系統。所以研究很是吃力。經過借鑑不同的網路文章終於在Windows上實現了kafka的配置。在這裡現在的配置是基於Zookeeper 3.4.6 版本的。如果讀者在操作希望能和我的版本保持一致。

zookeeper

瞭解kafka的都知道。kafka是通過Zookeeper實現分佈操作的。不管是broker,consumer,還是provide資訊都是儲存在Zookeeper中的。當broker掛掉都是Zookeeper來進行重新分配選擇的。所以實現kafka叢集前我們得先實現Zookeeper的叢集配置。

首先我們從官網上下載Zookeeper到本地。我這裡下載的是Zookeeper-3.4.6.tar.gz版本的。讀者可以根據自己情況下載。下載好之後進行檔案解壓。然後找到conf檔案中的zoo_sample.cfg檔案。該檔案是Zookeeper官網給我們提供的一套樣板。我們賦值該檔案到同級下並改名為zoo.cfg

.如下圖

這裡寫圖片描述

修改配置檔案

然後我們來看看這個配置檔案裡面都有些啥

tickTime=2000

伺服器之間或客戶端與服務端之間維持心跳的時間。就是每隔tickTime就會發送一次心跳。單位毫秒

initLimit=10

這個是配置Zookeeper接收客戶端初始化連線最長能忍受initLimit心跳時間間隔。

syncLimit=5

Leader與follow之間傳送訊息和應答的時間 總時間=syncLimit*tickTime

dataDir

zookeeper資料保持路徑 預設將log日誌也儲存在dataDir

dataLogDir

zookeeper log日誌儲存地址 不設定預設是dataDir

clientPort=2181

客戶端連線的埠

server.1=192.168.1.130:28881:38881
server.2=192.168.1.130:28882:38882
server.3=192.168.1.130:28883:38883

因為我的叢集都是在同一臺電腦上配置的,所以這裡埠不能一樣


知道配置檔案裡的意思應該就知道如何修改了吧

對於新手我們只需要該以下地方呢。

dataDir+dataLogDir+clientPort

但是下面的server是Zookeeper配置裡需要重點講解的部分

上面的格式我們可以簡單的總結為 server.num=B:C:D。

num:是正整數代表的服務的唯一標識。這個要和後面說道的myid檔案保持一致。

B: 標識Zookeeper叢集中某一個服務的ip或者域名 192.168.1.130

C:表示server.num這個服務於叢集中leader進行資訊交流的埠。在kafka中我們leader和follower需要進行資料備份。具體服務就是通過這個地方制定的埠進行通訊的。

D:表示萬一leader宕機了,我們就通過這個埠來進行再follower中選舉新的leader。

大坑預防

網上的很多教程也就介紹到這裡。稍微好點就提了一下建立myid檔案的事,我當時就糾結在這裡。因為我根本不知道穿件的myid的型別。我就隨便建立txt檔案。結果是錯的。這裡我們建立myid我有兩種方式。還有myid裡面的內容就是我們對應的配置檔案中server.num中的num。

第一種就是我們通過cmd視窗到我們要建立myid的資料夾下

執行如下命令

echo 1 > myid

這裡寫圖片描述

第二種是我們先建立TXT檔案將對應的內容寫入。然後txt字尾刪掉就可以了。

順便提一下myid應該放在我們conf/zoo.cfg檔案中指定的dataDir 的對應的檔案路徑下。

服務開啟

所謂的叢集就是講上面的Zookeeper複製成多個,將上面提到的幾個重要的屬性更改掉就行了。

如果你到這一步說明你離成功已經不遠了。下面我們只需要開啟服務就行了。開啟服務在我們解壓的bin目錄下。

這裡寫圖片描述

這裡我們得有些常識,已sh結尾的是Linux 系統的shell檔案。在windows上沒有裝外掛是無法使用的。我們windows認識的就是bat檔案。就是上面的cmd結尾才是我們可以用的功能。但是我們還需要進行一下修改。其實這裡已經可以了。我們到cmd視窗中通過該命令去執行我們zoo.cfg檔案。但是為了方便我們這裡講zoo.cfg配置進我們的zkServer.cmd檔案中

這裡寫圖片描述

好了。配置完成。我們只需要每次點選zkServer.cmd就開啟了Zookeeper中的服務了。

友情提醒

上面我們配置的Zookeeper在開啟第一個時候回報錯。為什麼呢。原因就是我們開啟了一個服務,。但是我們的配置檔案配置的是叢集的資訊。這個時候就回去尋找其他服務。但是這個時候其他的服務還沒有開啟呢。所以這個錯誤是正常。等我們叢集中的所有的服務都開啟了就不會報錯。這裡大家不要被嚇到。

除此之外,還有一點就是Zookeeper的安裝目錄(解壓目錄)是絕對不能包含漢字的。我上面的截圖有漢字那是我計算機上設定的。實際的路徑是沒有漢字的。不要被上面的圖片誘導。

當所有的服務都開啟了,我們如何檢視我們的服務是否開啟成功呢。這很簡單。我們重新開啟一個新的cmd視窗。直接執行jps就可以看到我們的服務了。QuorumPeerMain就是我們的服務主類

這裡寫圖片描述

Kafka叢集配置

上面我們就完成了Zookeeper的叢集的配置。實際上Kafka中就自帶有Zookeeper的服務。但是為了資料的高可用性。我們最好選擇自己搭建Zookeeper叢集。這也是官網上的建議。

這裡我的Kafka版本選擇的是0.8.1.1。建議單價不要選擇太高的版本。剛出的版本可能有未知的bug。

同樣這裡的叢集就是講Kafka複製多個。這裡我選擇其中一個進行講解。其他的都是一樣的主要就是講埠改掉就行了。

將官網下載的Kafka解壓改名為kafka1(其他的改名數字遞增就行。或者自定義別的名字)。找到config/server.properties檔案。

server.properties修改

同樣的先來了解裡面的引數含義吧

broker.id=1

在kafka這個叢集中的唯一標識,且只能是正整數

port=9091

該服務監聽的埠

host.name=192.168.1.130

broker 繫結的主機名稱(IP) 如果不設定將繫結所有的介面。

advertised.host.name=192.168.1.130

broker服務將通知消費者和生產者 換言之,就是消費者和生產者就是通過這個主機(IP)來進行通訊的。如果沒有設定就預設採用host.name。

num.network.threads=2

broker處理訊息的最大執行緒數,一般情況是CPU的核數

num.io.threads=8

broker處理IO的執行緒數 一般是num.network.threads的兩倍

socket.send.buffer.bytes=1048576

socket傳送的緩衝區。socket調優引數SO_SNDBUFF

socket.receive.buffer.bytes=1048576

socket接收的緩衝區 socket的調優引數SO_RCVBUF

socket.request.max.bytes=104857600

socket請求的最大數量,防止serverOOM。

log.dirs=\logs

kafka資料的存放地址,多個地址的話用逗號隔開。多個目錄分佈在不同的磁碟上可以提高讀寫效能

num.partitions=2

每個tipic的預設分割槽個數,在建立topic時可以重新制定

log.retention.hours=168

資料檔案的保留時間 log.retention.minutes也是一個道理。

log.segment.bytes=536870912

topic中的最大檔案的大小 -1表示沒有檔案大小限制 log.segment.bytes 和log.retention.minutes 任意一個達到要求 都會刪除該檔案 在建立topic時可以重新制定。若沒有.則選取該預設值

log.retention.check.interval.ms=60000

檔案大小檢查的週期時間,是否處罰 log.cleanup.policy中設定的策略

log.cleaner.enable=false

是否開啟日誌清理

zookeeper.connect=192.168.1.130:num1,192.168.1.130:num2,192.168.1.130:num3

上面我們的Zookeeper叢集

zookeeper.connection.timeout.ms=1000000

進群連結時間超時

同樣的我們每次賦值kafka服務我們只需該配置檔案裡的下面兩個屬性就行了。

broker.id  +  port

服務啟動前的命令準備

同樣的我們觀察bin目錄中我們會發現Kafka針對Linux和windows提供了不同的元件。windows的元件放在了windows的資料夾下了。但是我在實際操作中無法使用裡面的命令。報一些錯誤。這裡我的解決辦法是將windows裡的bat全部複製到外面。就是複製到bin目錄下。

這裡寫圖片描述

上圖中指出來的bat原本是在windows檔案中。拷貝到bin目錄之後我們需要修改一下kafka-run-class.bat檔案。因為裡面寫的相對路徑和引入的jar會導致出錯。所以我們將裡面的這段程式碼

set ivyPath=%USERPROFILE%\.ivy2\cache

set snappy=%ivyPath%/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.5.jar
     call :concat %snappy%

set library=%ivyPath%/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar
     call :concat %library%

set compiler=%ivyPath%/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar
     call :concat %compiler%

set log4j=%ivyPath%/log4j/log4j/jars/log4j-1.2.15.jar
     call :concat %log4j%

set slf=%ivyPath%/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar
     call :concat %slf%

set zookeeper=%ivyPath%/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar
     call :concat %zookeeper%

set jopt=%ivyPath%/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar
     call :concat %jopt%

for %%i in (%BASE_DIR%\core\target\scala-2.8.0\*.jar) do (
     call :concat %%i ) for %%i in (%BASE_DIR%\core\lib\*.jar) do (
     call :concat %%i ) for %%i in (%BASE_DIR%\perf\target\scala-2.8.0/kafka*.jar) do (
     call :concat %%i ) 

替換成

for %%i in (%BASE_DIR%\libs\*.jar) do (
     call :concat %%i ) 

我們仔細觀察原來的配置大概意思是引入一些jar包啥的。但是會出現有的時候我們的檔案根本沒有那個jar。但是又引入了。會經常報錯。所以我們改成引入libs下的所有jar.有啥就引入啥。這樣就不會報錯的。

大坑預防

到這裡我原本天真的認為就已經完事了。但是誰知我按照網上的教程繼續的時候就出現如下錯誤

這裡寫圖片描述

首先第一行提示 set JMX_PORT to default value 9999 這個錯誤是因為我沒有設定這個值。這倒是小事。但是後面報說找不到或無法載入主類kafka.Kafka這就讓我費解。在這裡我也是卡了一天了。後來在網上找到了一個方法。我不知道這是不是Kafka的bug。反正用這個方法我是解決了這個錯誤了。 
解決辦法就是將kafka-run-class.bat檔案中

set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp %CLASSPATH% %*

修改為

set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp "%CLASSPATH%" %*

對比我們發現就是將classpath加上雙引號。搞了半天就是系統變數路徑沒有找到的原因。不過這個問題值得引起我們的注意。我們的kafka寄去你的搭建實在Java 的jdk基礎是搭建的。所以前提我們得將jdk等這些配置到環境變數中去。這裡的配置網上搜去吧很多。

服務開啟

到這一步我們離kafka的成功又不遠了。我們新開cmd視窗cd到kafka的bin目錄中。

但是在執行開啟之前我們需要先執行

Set JMX_PORT=19091(每個服務數字不能一樣)

然後在執行

kafka-server-start.bat ..\config\server.properties

建立Topic批處理

官網上是沒有提供windows版本的topic處理程式的。我們需要自己新建一個bat檔案。這個bat檔案的內容填寫如下

kafka-run-class.bat  kafka.admin.TopicCommand  %*

這裡寫圖片描述

訊息處理

有了這個批處理我們就可以通過它實現topic的建立。生產者傳送訊息和消費者的接收訊息

建立Topic

replication-factor:表示該topic需要在不同的broker中儲存
partitions : 對該top的分割槽數量
topic : 該top的名稱。建議指定。否則採用預設
kafka-topics.bat --create --zookeeper 192.168.1.130:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic

檢視Topic

kafka-topics.bat –describe –zookeeper 192.168.1.130:2181 –topic my-replicated-topic

生產topic訊息

kafka-console-producer.bat --broker-list 192.168.1.130:9093 --topic my-replicated-topic

消費topic訊息

kafka-console-consumer.bat --zookeeper 192.168.1.130:2181 --from-beginning --topic my-replicated-topic

最後在接收發送訊息是我們需要重新建立新的cmd視窗。下面看看效果圖。最終實現實時接收訊息

 

更多參考:

https://kafka.apache.org/quickstart