1. 程式人生 > >CentOS 7搭建Zookeeper和Kafka叢集

CentOS 7搭建Zookeeper和Kafka叢集

# 環境 * CentOS 7.4 * Zookeeper-3.6.1 * Kafka_2.13-2.4.1 * Kafka-manager-2.0.0.2 本次安裝的軟體全部在 `/home/javateam` 目錄下。 # Zookeeper 叢集搭建 1. 新增三臺機器的 `hosts`,使用 `vim /etc/hosts` 命令新增以下內容: ```shell 192.168.30.78 node-78 192.168.30.79 node-79 192.168.30.80 node-80 ``` 2. 首先解壓縮: ```shell tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz ``` 修改資料夾名稱: ```shell mv apache-zookeeper-3.6.1-bin.tar.gz zookeeper ``` 4. 向 `/etc/profile` 配置檔案新增以下內容,並執行`source /etc/profile`命令使配置生效: ```shell export ZOOKEEPER_HOME=/home/javateam/zookeeper export PATH=$PATH:$ZOOKEEPER_HOME/bin ``` 5. 在上面配置檔案中 `dataDir` 的目錄下建立一個 `myid` 檔案,並寫入一個數值,比如0。`myid` 檔案裡存放的是伺服器的編號。 6. 修改zookeeper配置檔案。首先進入 `$ZOOKEEPER_HOME/conf` 目錄,複製一份 `zoo_sample.cfg` 並將名稱修改為 `zoo.cfg`: ```shell # zookeeper伺服器心跳時間,單位為ms tickTime=2000 # 投票選舉新leader的初始化時間 initLimit=10 # leader與follower心跳檢測最大容忍時間,響應超過 syncLimit * tickTime,leader認為follower死掉,從伺服器列表刪除follower syncLimit=5 # 資料目錄 dataDir=/home/javateam/zookeeper/data/ # 日誌目錄 dataLogDir=/home/javateam/zookeeper/logs/ # 對外服務的埠 clientPort=2181 # 叢集ip配置 server.78=node-78:2888:3888 server.79=node-79:2888:3888 server.80=node-80:2888:3888 ``` > 注意: 上面配置檔案中的資料目錄和日誌目錄需自行去建立對應的資料夾。這裡server後的數字,與myid檔案中的id是一致的。 7. zookeeper啟動會佔用三個埠,分別的作用是: ```shell 2181:對cline端提供服務 3888:選舉leader使用 2888:叢集內機器通訊使用(Leader監聽此埠) ``` 記得使用以下命令開啟防火牆埠,並重啟防火牆: ```shell firewall-cmd --zone=public --add-port=2181/tcp --permanent firewall-cmd --zone=public --add-port=3888/tcp --permanent firewall-cmd --zone=public --add-port=2888/tcp --permanent firewall-cmd --reload ``` 8. 然後用 `zkServer.sh start` 分別啟動三臺機器上的zookeeper,啟動後用 `zkServer.sh status` 檢視狀態,如下圖所以有一個leader兩個follower即代表成功: ![WeChatfc81462212a55ba049b1afa06c9fabef.png](http://ww1.sinaimg.cn/large/006Vpl27ly1geqwzuvdfsj30n006odhf.jpg) ![WeChat4d789c67638c5fc635fb2cc149d218c2.png](http://ww1.sinaimg.cn/large/006Vpl27ly1geqx0hkdzrj30my06g75v.jpg) ![WeChatbfe38654fcccfcb45aba87c0fd2a4d58.png](http://ww1.sinaimg.cn/large/006Vpl27ly1geqx11miguj30mw06o0uc.jpg) # Kafka 叢集搭建 1. 首先解壓縮: ```shell tar -zxvf kafka_2.13-2.4.1.tgz ``` 2. 改資料夾名稱: ```shell mv kafka_2.13-2.4.1.tgz kafka ``` 3. 向 `/etc/profile` 配置檔案新增以下內容,並執行`source /etc/profile`命令使配置生效: ```shell export KAFKA_HOME=/home/javateam/kafka export PATH=$PATH:$KAFKA_HOME/bin ``` 4. JVM級別引數調優,修改 `kafka/bin/kafka-server-start.sh`,新增以下內容: ```shell # 調整堆大小,預設1G太小了 export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G" # 選用G1垃圾收集器 export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true" # 指定JMX暴露埠 export JMX_PORT="8999" ``` 新增後,檔案內容如下圖所示: ![WeChat9d9de8b4d35d8483d8920db2bd98f524.png](http://ww1.sinaimg.cn/large/006Vpl27ly1geqx9qzyzdj31bi0wihdt.jpg) 5. 作業系統級別引數調優,增加檔案描述符的限制,使用 `vim /etc/security/limits.conf` 新增以下內容: ```shell * soft nofile 100000 * hard nofile 100000 * soft nproc 65535 * hard nproc 65535 ``` 6. 修改kafka的配置檔案 `$KAFKA_HOME/conf/server.properties`,如下: ```shell ############################# Server Basics ############################# # 每一個broker在叢集中的唯一標示,要求是正數。在改變IP地址,不改變broker.id的話不會影響consumers broker.id=78 ############################# Socket Server Settings ############################# # 提供給客戶端響應的地址和埠 listeners=PLAINTEXT://node-78:9092 # broker 處理訊息的最大執行緒數 num.network.threads=3 # broker處理磁碟IO的執行緒數 ,數值應該大於你的硬碟數 num.io.threads=8 # socket的傳送緩衝區大小 socket.send.buffer.bytes=102400 # socket的接收緩衝區,socket的調優引數SO_SNDBUFF socket.receive.buffer.bytes=102400 # socket請求的最大數值,防止serverOOM,message.max.bytes必然要小於socket.request.max.bytes,會被topic建立時的指定引數覆蓋 socket.request.max.bytes=104857600 ############################# Log Basics ############################# # kafka資料的存放地址,多個地址的話用逗號分割 log.dirs=/home/javateam/kafka/logs # 每個topic的分割槽個數,若是在topic建立時候沒有指定的話會被topic建立時的指定引數覆蓋 num.partitions=3 # 每個分割槽的副本數 replication.factor=2 # 我們知道segment檔案預設會被保留7天的時間,超時的話就會被清理,那麼清理這件事情就需要有一些執行緒來做。這裡就是用來設定恢復和清理data下資料的執行緒數量 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # 控制一條訊息資料被儲存多長時間,預設是7天 log.retention.hours=168 # 指定Broker為訊息儲存的總磁碟容量大小,-1代表不限制 log.retention.bytes=-1 # Broker能處理的最大訊息大小,預設976KB(1000012),此處改為100MB message.max.bytes=104857600 # 日誌檔案中每個segment的大小,預設為1G log.segment.bytes=1073741824 #上面的引數設定了每一個segment檔案的大小是1G,那麼就需要有一個東西去定期檢查segment檔案有沒有達到1G,多長時間去檢查一次,就需要設定一個週期性檢查檔案大小的時間(單位是毫秒)。 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # 消費者叢集通過連線Zookeeper來找到broker。zookeeper連線伺服器地址 zookeeper.connect=node-78:2181,node-79:2181,node-80:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 ############################# Broker Settings ############################# # 不讓落後太多的副本競選Leader unclean.leader.election.enable=false # 關閉kafka定期對一些topic分割槽進行Leader重選舉 auto.leader.rebalance.enable=false ``` 7. 編寫kafka啟動指令碼,`vim startup.sh` 內容如下所示: ```shell # 程序守護模式啟動kafka kafka-server-start.sh -daemon /home/javateam/kafka/config/server.properties ``` 8. 編寫kafka停止指令碼,`vim shutdown.sh` 內容如下所示: ```shell # 停止kafka服務 kafka-server-stop.sh ``` 9. 用如下命令,分別啟動kafka服務: ```shell sh /home/javateam/kafka/startup.sh ``` > 注意:後面的路徑換成你自己指令碼所在的路徑。 10. 啟動成功後,連線zookeeper檢視節點 `ids` 資訊: ```shell zkCli.sh -server 127.0.0.1:2181 ls /brokers/ids ``` 如下圖所示,代表叢集搭建成功: ![WeChatc5943d73fa0fd92c1a66962147af7afd.png](http://ww1.sinaimg.cn/large/006Vpl27ly1geqy2tkbixj30ju020wev.jpg) # Kafka-manager 搭建 1. 首先解壓縮: ```shell unzip kafka-manager-2.0.0.2.zip ``` 2. 改資料夾名稱 ```shell mv kafka-manager-2.0.0.2.zip kafka-manager ``` 3. 修改配置檔案 `kafka-manager/conf/application.conf`,把裡面的 `kafka-manager.zkhosts` 換成你自己的zookeeper 叢集地址就好了,例如:`kafka-manager.zkhosts="node-78:2181,node-79:2181,node-80:2181"` 4. 編寫 kafka-manager 啟動指令碼,`vim startup.sh` 內容如下: ```shell nohup /home/javateam/kafka-manager/bin/kafka-manager -Dhttp.port=9000 > /home/javateam/kafka-manager/nohup.out 2>&1 & ``` 5. 使用 `sh /home/javateam/kafka-manager/startup.sh` 啟動 kafka-manager,然後訪問9000埠,如下圖所示代表成功: ![WeChatcf5d42bbb563a7b23864ed6755ce9c0d.png](http://ww1.sinaimg.cn/large/006Vpl27ly1geqyb0c4c5j31ng0jg775.jpg) 不知道怎麼使用的話就去 google,這裡不再