1. 程式人生 > >kafka叢集部署與配置手冊

kafka叢集部署與配置手冊

本文中包含了一套kafka叢集的部署、配置、除錯和壓測的技術方法。

在三個主機節點上進行部署。
server1:192.168.10.1 server2:192.168.10.2 server3:192.168.10.3 1、jdk7u80的安裝與配置 rpm -ivh jdk-7u80-linux-x64.rpm 配置環境變數:
more /etc/profile
JAVA_HOME=/usr/java/jdk1.7.0_80
PATH=\$JAVA_HOME/bin:\$PATH:.
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME
export PATH
export CLASSPATH
注:低版本jdk在執行kafka時存在bug。 2、系統iptables防火牆對區域網段開放以下埠
-A INPUT -s 192.168.10.0/24 -m state --state NEW -m tcp -p tcp --dport 2888 -j ACCEPT
-A INPUT -s 192.168.10.0/24 -m state --state NEW -m tcp -p tcp --dport 3888 -j ACCEPT
-A INPUT -s 192.168.10.0/24 -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
-A INPUT -s 192.168.10.0/24 -m state --state NEW -m tcp -p tcp --dport 2181 -j ACCEPT
3、叢集節點間主機名解析配置
more /etc/hosts
192.168.10.1    server1
192.168.10.2    server2
192.168.10.3    server3
4、在三個主機節點上部署kafka-2.11如下
cd /data
unzip kafka_2.11-0.10.0.0.zip
mv kafka_2.11-0.10.0.0/ kafka
5、配置zookeeper叢集 注:以下除特別說明在哪個節點進行配置外,均需要修改三個主機節點。 因為該zookeeper是專服務於kafka的,所以直接把其資料目錄放置於/data/kafka/zookeeper,便於後續管理。 mkdir -p /data/kafka/zookeeper
編輯zookeeper配置檔案:
cd /data/kafka
vi config/zookeeper.properties

tickTime=2000
dataDir=/data/kafka/zookeeper
clientPort=2181
maxClientCnxns=0
initLimit=15
syncLimit=5
server.1=192.168.10.1:2888:3888
server.2=192.168.10.2:2888:3888
server.3=192.168.10.3:2888:3888
建立ServerID標識: 節點server1:echo "1" > /data/kafka/zookeeper/myid 節點server2:echo "2" > /data/kafka/zookeeper/myid 節點server3:echo "3" > /data/kafka/zookeeper/myid 注:這裡設定的myid取值需要和zookeeper.properties中“server.id”保持一致。 chmod +x zookeeper-server-start.sh zookeeper-server-stop.sh kafka-run-class.sh 修改zookeeper啟動指令碼如下,以便於管理:
$ more zookeeper-server-start.sh
#!/bin/bash
#if [ $# -lt 1 ];
#then
#    echo "USAGE: $0 [-daemon] zookeeper.properties"
#    exit 1
#fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
fi
EXTRA_ARGS="-name zookeeper -loggc"
#COMMAND=$1
COMMAND="-daemon"
case $COMMAND in
  -daemon)
     EXTRA_ARGS="-daemon "$EXTRA_ARGS
     shift
     ;;
 *)
     ;;
esac
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "[email protected]"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "../config/zookeeper.properties"
啟停zookeeper的方法:
cd /data/kafka/bin
./zookeeper-server-start.sh
./zookeeper-server-stop.sh
6、配置kafka叢集 配置/data/kafka/config/server.properties如下。三個主機節點上配置檔案中僅前面幾行的引數取值不同。
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
############################# Socket Server Settings #############################
listeners=PLAINTEXT://192.168.10.1:9092
port=9092
host.name=192.168.10.1
# The number of threads handling network requests
num.network.threads=8
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# The number of queued requests allowed before blocking the network threads
queued.max.requests=100
# The purge interval (in number of requests) of the fetch request purgatory
fetch.purgatory.purge.interval.requests=200
# The purge interval (in number of requests) of the producer request purgatory
producer.purgatory.purge.interval.requests=200

############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/data/kafka/kafka-logs
# The default number of log partitions per topic. 
num.partitions=24
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
num.recovery.threads.per.data.dir=2
# The maximum size of message that the server can receive
message.max.bytes=1000000
# Enable auto creation of topic on the server
auto.create.topics.enable=true
# The interval with which we add an entry to the offset index
log.index.interval.bytes=4096
# The maximum size in bytes of the offset index
log.index.size.max.bytes=10485760
# Allow to delete topics
delete.topic.enable=true
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=20000
# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=10000
# The frequency in ms that the log flusher checks whether any log needs to be flushed to disk
log.flush.scheduler.interval.ms=2000
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. 
log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# The maximum time before a new log segment is rolled out (in hours)
log.roll.hours=168
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
zookeeper.connect=192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
# How far a ZK follower can be behind a ZK leader
zookeeper.sync.time.ms=2000

############################# Replication configurations ################
# default replication factors for automatically created topics
default.replication.factor=3
# Number of fetcher threads used to replicate messages from a source broker.
num.replica.fetchers=4
# The number of bytes of messages to attempt to fetch for each partition.
replica.fetch.max.bytes=1048576
# max wait time for each fetcher request issued by follower replicas. 
replica.fetch.wait.max.ms=500
# The frequency with which the high watermark is saved out to disk
replica.high.watermark.checkpoint.interval.ms=5000
# The socket timeout for network requests.
replica.socket.timeout.ms=30000
# The socket receive buffer for network requests
replica.socket.receive.buffer.bytes=65536
# If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr
replica.lag.time.max.ms=10000
# The socket timeout for controller-to-broker channels
controller.socket.timeout.ms=30000
controller.message.queue.size=10

7、配置kafka生產者和消費者 修改/data/kafka/config下的producer.properties檔案 bootstrap.servers=192.168.10.1:9092,192.168.10.2:9092,192.168.10.3:9092 producer.type=async compression.type=snappy 修改/data/kafka/config下的comsumer.properties檔案 zookeeper.connect=192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181 8、kafka叢集服務啟停管理的配置
chmod +x bin/kafka-server-start.sh kafka-server-stop.sh
修改kafka-server-start.sh
more kafka-server-start.sh
#!/bin/bash
#if [ $# -lt 1 ];
#then
#    echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
#    exit 1
#fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx5G -Xms5G"
fi

EXTRA_ARGS="-name kafkaServer -loggc"

#COMMAND=$1
COMMAND="-daemon"
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "[email protected]"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "../config/server.properties"
啟停kafka服務的方法: cd /data/kafka/bin ./kafka-server-start.sh ./kafka-server-stop.sh 注:觀察和檢查/data/kafka/logs下的各個日誌檔案,以確認無報錯資訊且各項服務日誌輸出正確。 9、kafka叢集的常用管理命令 建議另外安排一個備機作為長期監控和測試kafka叢集的主機。程式部署在/data/kafka下。 建立了以下用於服務和效能監控的topic: ./kafka-topics.sh --create --zookeeper 192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181 --replication-factor 3 --partitions 6 --topic test-for-sys-monitor 檢視已建立了的topics列表: ./kafka-topics.sh --list --zookeeper 192.168.10.1:2181 檢視指定topic的詳情: ./kafka-topics.sh --describe --zookeeper 192.168.10.1:2181 --topic test-for-sys-monitor 使用終端生產者命令進行測試: ./kafka-console-producer.sh --broker-list 192.168.10.1:9092,192.168.10.2:9092,192.168.10.3:9092   --topic test-for-sys-monitor 使用終端消費者命令進行測試: ./kafka-console-consumer.sh --zookeeper 192.168.10.1:2181 --topic test-for-sys-monitor 注:以上測試,在生產者側輸入的資料,會在消費者側打印出來。 刪除topic的命令: ./kafka-topics.sh --delete --zookeeper 192.168.10.1:2181  --topic test-for-sys-monitor 注:kafka叢集中放開了刪除topic的功能,請謹慎使用。 壓測寫500萬條資料,每條1KB: ./kafka-producer-perf-test.sh --topic test-perf-20161220 --num-records 500000 --record-size 1000 --throughput 100000 --producer-props bootstrap.servers=192.168.10.1:9092,192.168.10.2:9092,192.168.10.3:9092 5000000 records sent, 53149.648149 records/sec (50.69 MB/sec), 569.30 ms avg latency, 2096.00 ms max latency, 8 ms 50th, 1759 ms 95th, 1874 ms 99th, 2044 ms 99.9th. 壓測讀6*100萬條資料:  ./kafka-consumer-perf-test.sh --zookeeper 192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181 --messages 1000000 --topic test-perf-20161220 --threads 6 start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2016-12-20 16:08:50:102, 2016-12-20 16:09:48:525, 5722.0459, 97.9417, 6000000, 102699.2794 注:壓測後,請及時刪除用於壓測的topic,因為所產生的kafka日誌資料很可觀。 登入zookeeper shell中檢視kafka建立的相關元資料:
./zookeeper-shell.sh 192.168.10.3:2181
Connecting to 192.168.10.3:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
ls /
     [consumers, config, controller, isr_change_notification, brokers, admin, zookeeper, controller_epoch]
get /brokers/ids/1
{"jmx_port":-1,"timestamp":"1482223274389","endpoints":["PLAINTEXT://192.168.10.1:9092"],"host":"192.168.10.1","version":3,"port":9092}
cZxid = 0x900000009
ctime = Tue Dec 20 16:41:14 CST 2016
mZxid = 0x900000009
mtime = Tue Dec 20 16:41:14 CST 2016
pZxid = 0x900000009
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1591b61880f0000
dataLength = 137
numChildren = 0
get /brokers/ids/2
{"jmx_port":-1,"timestamp":"1482223307625","endpoints":["PLAINTEXT://192.168.10.2:9092"],"host":"192.168.10.2","version":3,"port":9092}
cZxid = 0x900000013
ctime = Tue Dec 20 16:41:47 CST 2016
mZxid = 0x900000013
mtime = Tue Dec 20 16:41:47 CST 2016
pZxid = 0x900000013
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1591b61880f0001
dataLength = 137
numChildren = 0
get /brokers/ids/3
{"jmx_port":-1,"timestamp":"1482223315746","endpoints":["PLAINTEXT://192.168.10.3:9092"],"host":"192.168.10.3","version":3,"port":9092}
cZxid = 0x900000020
ctime = Tue Dec 20 16:41:55 CST 2016
mZxid = 0x900000020
mtime = Tue Dec 20 16:41:55 CST 2016
pZxid = 0x900000020
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3591b618ffe0001
dataLength = 137
numChildren = 0
quit
     Quitting...
改變topic的分割槽數量 ./kafka-topics.sh --alter --zookeeper 192.168.10.1:2181 --topic test_topic --partitions 4 增刪改topic的配置引數 ./kafka-topics.sh --alter --zookeeper 192.168.10.1:2181 --topic test_topic--config key=value ./kafka-topics.sh —alter --zookeeper 192.168.10.1:2181 --topic test_topic--deleteConfig key 檢視topic的分割槽、副本狀態,需要關注各個分割槽的負載分佈是否均衡(即Leader角色的分佈):
./kafka-topics.sh --describe --zookeeper 192.168.10.1:2181 --topic test-for-sys-monitor
Topic:test-for-sys-monitor    PartitionCount:6    ReplicationFactor:3    Configs:
    Topic: test-for-sys-monitor    Partition: 0    Leader: 1    Replicas: 1,3,2    Isr: 1,3,2
    Topic: test-for-sys-monitor    Partition: 1    Leader: 2    Replicas: 2,1,3    Isr: 2,1,3
    Topic: test-for-sys-monitor    Partition: 2    Leader: 3    Replicas: 3,2,1    Isr: 3,2,1
    Topic: test-for-sys-monitor    Partition: 3    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
    Topic: test-for-sys-monitor    Partition: 4    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
    Topic: test-for-sys-monitor    Partition: 5    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
執行leader分佈的再平衡: ./kafka-preferred-replica-election.sh --zookeeper 192.168.10.1:2181 注:kafka本身是會自動做leader分佈再平衡工作的,但不會是發現問題後立即執行,會有半小時的延遲。 注:很多配置引數,除非明確理解其作用,否則不必配置,因為kafka自身是對這些引數設定了預設值的,這些預設設定已經是一個比較好的選擇。

相關推薦

kafka叢集部署配置手冊

本文中包含了一套kafka叢集的部署、配置、除錯和壓測的技術方法。 在三個主機節點上進行部署。 server1:192.168.10.1 server2:192.168.10.2 server3:192.168.10.3 1、jdk7u80的安裝與配置 rpm -iv

Zookeeper叢集部署配置(三)

在上一篇部落格中我們講解了《Zookeeper的單機配置》,此篇部落格將繼續介紹Zookeeper的叢集部署與配置。 環境 叢集配置的環境與單機配置的環境相同,唯一不同的就是叢集是在多臺伺服器之間配置,當然也有偽叢集的配置,也就是在同一臺機器上配置多臺服

Kafka(二): Kafka 叢集部署使用

一、Kafka 叢集部署                                                               Kafka是一種分散式的釋出(producer)/訂

Kafka叢集部署shell命令列操作

1、kafka簡介 在流式計算中,Kafka一般用來快取資料,Storm通過消費Kafka的資料進行計算。 KAFKA + STORM +REDIS 1、Apache Kafka是一個開源訊息系統,由Scala寫成。是由Apache軟體基金會開發的一個開源

大資料(三十):zookeeper叢集kafka叢集部署

一、安裝Zookeeper 1.叢集規劃 在hadoop102、hadoop103和hadoop104三個節點上部署Zookeeper。 2.解壓安裝        1.解壓zookeeper安裝包到/usr/local/目錄下 tar -zxvf zookeepe

kafka+zookeeper叢集安裝配置(CENTOS7環境)及開發中遇到的問題解決

kafka+zookeeper叢集安裝與配置及問題解決(CENTOS)ZOOKEEPER 叢集主要配置(zoo.cfg) :tickTime=2000 initLimit=10 syncLimit=5 dataDir=/home/hadoop/spark/zookeeper-3.4.6/zkdata data

kubeadm實現k8s高可用叢集環境部署配置

# 高可用架構 k8s叢集的高可用實際是**k8s各核心元件**的**高可用**,這裡使用**主備模式**,架構如下: ![在這裡插入圖片描述](https://img-blog.csdnimg.cn/20200602143045152.png?x-oss-process=image/watermark,ty

confluence 部署配置

atlas app web-inf link 鏈接 default 5.1 connector png 什麽是confluence 部署步驟 安裝 下載目標平臺目標版本安裝包。windows為例 若第一次默認安裝 一直下一步。 若再次安裝 下載並

Kubernetes學習系列之簡單叢集安裝配置

環境配置 CentOS Linux release 7.3.1611 (Core)  etcd-v3.2.6 docker-ce-17.03.2.ce kubernetes-v1.6.9 192.168.108.128 節點1 192.168.108.129 節點2 19

Django部署配置、Django專案應用、django模型

一、部署django 1.1 Django概述 1.1.1 Django簡介 Django是一個開源的Web應用框架,由Python寫成。 1.1.2 框架介紹 Django框架的核心包括: 1.  面向物件的對映器,用作資料模型(以Pyth

MySQL Galera 叢集安裝配置

galera 叢集概述與搭建 Galera replication原理 從客戶端看整體的流程 其中對應的角色分為2個:協調者和參與者 協調者: 1、 接收客戶端請求 2、 廣播請求到其他參與者(包括自己) 3、 作為參與者進行資料更新 4、 更新失敗

Java==Ubuntu環境部署配置

1 Ubuntu環境Java 1.1 部署Java環境 下載JDK https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html 建立JDK目錄 s

RabbitMQ高可用叢集部署配置+HAproxy負載(原始碼)

1.環境 rabbitmq-1 10.24.43.4 centos6.x rabbitmq-2 10.24.43.5 centos6.x 2.

Postgres-XL叢集部署管理指南

Postgres-XL是一個基於PostgreSQL資料庫的橫向擴充套件開源SQL資料庫叢集,具有足夠的靈活性來處理不同的資料庫工作負載,架構如下圖所示: Web 2.0 操作資料儲存 GIS的地理空間 混合業務工作環境 OLTP 寫頻繁的業務 多租戶服務提供商託管環境 完全A

Spark2.2叢集部署配置(CentOS)

說明 1、Spark版本:2.2.0 2、Spark下載的地址:http://spark.apache.org/downloads.html 3、Spark下載的型別:原始碼、編譯後的軟體包等 4、Spark依賴的軟體/軟體包: Linux(Centos 6.5) Java(1.8+)

zookeeper和kafka叢集部署

叢集zookeeper部署 1.找到每臺物理節點的zookeeper配置檔案所在目錄: /home/soft/NodeServer/zookeeper/conf/zoo.cfg 2.修改配置檔案中的IP資訊: Server.1 = IP1:2887

6、neutron服務部署配置

一、控制節點部署 1、資料庫配置 connection = mysql+pymysql://neutron:[email protected]/neutron 2、配置keystone auth_strategy = keystone [keystone_authtoken

mysql8.0 在window環境下的部署配置

今天在阿里雲window伺服器上配置mysql環境,踩了一些坑,分享出來。需要的朋友可以看看。額,或許有人要吐槽我為什麼不在linux上去配置,額,因為我window的那臺伺服器配置相對高些。本人技術方面偏向於.net,現在接觸php專案所以搭建LAMP環境。只不過我的資料庫放在window上的這臺伺服器。言

【圖文詳細 】Kafka訊息佇列——kafka 叢集部署

5.1、Kafka 初體驗  單機 Kafka 試玩 官網網址:http://kafka.apache.org/quickstart 中文官網:http://kafka.apachecn.org/quickstart.html  5.2、叢集部署的基本流程總結&n

Zookeeper+Kafka叢集部署

主機規劃: 10.200.3.85    Kafka+ZooKeeper 10.200.3.86    Kafka+ZooKeeper 10.200.3.87    Kafka+ZooKeeper 軟體下載地址: #wget http://mirrors.hust.