Kafka 0.10 安裝及使用
1 總體說明
筆記本:i5第六代,16G記憶體,256G固態硬碟
使用VirtualBox 5.0.22建立3臺虛擬機器。
2 主機規化
主機名 |
IP |
用途 |
master |
192.168.56.101 |
|
slave1 |
192.168.56.102 |
|
slave2 |
192.168.56.103 |
3 目錄規化
元件 |
目錄 |
說明 |
JDK |
/usr/java/jdk1.8.0_92 |
ln -s /usr/java/jdk1.8.0_92 /usr/java/default |
zookeeper |
/opt/zookeeper |
|
kafka |
/opt/kafka |
|
各輸出目錄的根 |
/var/kafka/ /var/zookeeper/ |
4 埠規化
埠 |
說明 |
9092 |
|
2888 |
ZooKeeper,如果是Leader,用來監聽Follower的連線 |
3888 |
ZooKeeper,用於Leader選舉 |
2181 |
ZooKeeper,用來監聽客戶端的連線 |
5 作業系統配置
5.1 OS安裝
l 使用CentOS 6.5版
l 磁碟劃分:
/boot 500MB ext4 boot partition # 迫使主分割槽
swap 2GB swap # 與實體記憶體一樣大
/ 剩餘空間 ext4 # 迫使主分割槽
/data1 sda所有空間 ext4
/data2 sdb所有空間 ext4
l 安裝軟體:sysstat , httpd, tftp-server, ntp
啟動sysstat :/etc/init.d/sysstat start
設定sysstat自啟動:checkfig sysstat on
l 安裝pssh:
tar zxf pssh-2.3.1.tar.gz
cd pssh-2.3.1
python setup.py install
pssh 多主機並行執行命令
pscp 傳輸檔案到多個hosts,他的特性和scp差不多
pslurp 從多臺遠端機器拷貝檔案
pnuke kill
pslurp 從遠端主機考本檔案到本地
prsync 使用rsync協議從本地計算機同步到遠端主機
5.2 配置SSH免密登陸
原理:就是我把我的公鑰放到你的authorized_keys裡面,然後我就可以ssh無密碼登入你了
5.2.1 配置規則
u NameNode 能免密碼登入所有的 DataNode
u SecondaryNameNode 能免密碼登入所有的 DataNode
u NameNode 能免密碼登入自己
u SecondaryNameNode 能免密碼登入自己
u NameNode 能免密碼登入 SecondaryNameNode
u SecondaryNameNode 能免密碼登入 NameNode
u DataNode 能免密碼登入自己
u DataNode 不需要配置免密碼登入 NameNode、SecondaryNameNode和其它 DataNode。
5.2.2 配置步驟
l 在master(NameNode)上執行:
cd ~
ssh-keygen -t rsa # 一路回車。
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
ssh localhost # 驗證本機無密碼登陸
ssh master # 驗證本機無密碼登陸
for ip in `seq 1 2`; do scp ~/.ssh/[email protected]$ip:~/keys.master; done
l 在slave1(SecondaryNameNode, DataNode)上執行:
cd ~
ssh-keygen -t rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
ssh localhost # 驗證本機無密碼登陸
ssh slave1 # 驗證本機無密碼登陸
cat ~/keys.master >> ~/.ssh/authorized_keys # 實現master免密碼登陸slave1
到master主機上執行:cat ~/keys.slave1 >> ~/.ssh/authorized_keys
l 在slave2(DataNode)上執行:
cd ~
ssh-keygen -t rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
ssh localhost # 驗證本機無密碼登陸
ssh slave2 # 驗證本機無密碼登陸
cat ~/keys.master >> ~/.ssh/authorized_keys
cat ~/keys.slave1 >> ~/.ssh/authorized_keys
l 免密碼登陸驗證
master(NN)登入所有節點
slave1(SNN)登入所有的 DataNode(slave2)
slave1(SNN)登入NameNode(master)
5.3 系統配置
每臺主機執行以下操作:
1. 修改hosts,增加各個主機
echo "# cluster hosts" >> /etc/hosts
echo "192.168.56.101 master" >>/etc/hosts
echo "192.168.56.102 slave1" >>/etc/hosts
echo "192.168.56.103 slave2" >>/etc/hosts
cat /etc/hosts
for ip in `seq 1 2`; do scp /etc/[email protected]$ip:/etc; done
2. 定義其他節點主機列表檔案
cd ~
vi hdp-other-hosts
slave1
slave2
該檔案是為了使用pssh進行批量操作用。檔案內容為每行一個主機名字。
之後,可使用類似如下命令從master主機上批量在其他主機上執行命令:
例如:pssh -h hdp-other-hosts "mkdir -p /hadoop /test"
3. 修改OS引數,如關閉防火牆,增加開啟檔案數等
l 修改主機名
cat /etc/sysconfig/network #看看主機名字是否正確,不對則修改
service network restart
l # 關閉iptables.
service iptables stop
service iptables status
chkconfig iptables off
chkconfig --list | grep iptables
pssh -h hdp-other-hosts "service iptables stop;chkconfig iptables off"
# 兩個命令中間用分號;以實現連續執行,即使有錯也會繼續。
# 如果每個命令被 && 號分隔,那麼這些命令會一直執行下去,如果中間有錯誤的命令存在,則不再執行後面的命令。
l # 關閉selinux.
vi /etc/selinux/config
SELINUX=disabled
l # 關閉透明大頁
vi /etc/rc.local
if test -f /sys/kernel/mm/transparent_hugepage/enabled;then
echo never> /sys/kernel/mm/transparent_hugepage/enabled
fi
if test -f /sys/kernel/mm/transparent_hugepage/defrag;then
echo never> /sys/kernel/mm/transparent_hugepage/defrag
fi
l # 讓機器儘量使用實體記憶體。如果設定成0,則意味著只使用實體記憶體。
echo "vm.swappiness=10" >>/etc/sysctl.conf
l # 最大開啟檔案資料和最大程序數 limits.conf 每臺機器都修改
vi /etc/security/limits.conf # 新增下面的內容
* - nofile 65535
* - nproc 65535
l 每臺機器重啟:reboot
l 檢查修改效果:
/usr/sbin/sestatus -v # 如果SELinux status引數為enabled即為開啟狀態
或者使用:getenforce,也可檢查SELinux的狀態
# 如果輸出結果為[always]表示透明大頁啟用了。[never]表示禁用、[madvise]表示
cat /sys/kernel/mm/transparent_hugepage/enabled, defrag
cat /proc/sys/vm/swappiness #應該是10
ulimit -a #應該是65535
5.4 NTP配置
l NTP伺服器(master)配置
vi /etc/ntp.conf
server 127.127.1.0
fudge 127.127.1.0 stratum 10
restrict 192.168.56.0 255.255.255.0 nomodifynotrap
chkconfig ntpd on
service ntpd start
ntpstat # show : synchronised to local net at stratum 11
l 其他主機配置
vi /etc/ntp.conf
server master
service ntpd start
chkconfig ntpd on
crontab -e
*/10 * * * * /usr/sbin/ntpdate -u master&>/var/log/ntpdate-cron.log
# 每10分鐘同步一次,'&'開始是為了除錯用
10分鐘後,各主機上執行:ntpstat,看是否同步了
如果機器重啟了,slave上執行ntpstat,顯示未同步,執行一次:service ntpd restart,再執行ntpstat,就顯示同步了。不知道這個怎麼搞好
6 JAVA安裝
l 在master上安裝JAVA
rpm -ivh jdk8.rpm
echo "export JAVA_HOME=/usr/java/default">> /etc/profile
echo "exportCLASSPATH=.:$JAVA_HOME/lib/tools.jar" >> /etc/profile
echo "export PATH=$JAVA_HOME/bin:$PATH">> /etc/profile
source /etc/profile
gtar cf java.gz /usr/java/
for ip in `seq 1 2`; do scp java.gz [email protected]$ip:/usr;done
for ip in `seq 1 2`; do scp /etc/[email protected]$ip:/etc; done
rm java.gz
java -version # 檢驗
l 在其他各主機上安裝JAVA
cd /usr
gtar xf java.gz
rm java.gz
java -version # 檢驗
7 Kafka安裝
7.1 安裝zookeeper
1. tar zxvfzookeeper-3.4.8.tar.gz
2. cpzookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo.cfg
3. 編輯zoo.cfg
ü 修改:dataDir=/var/zookeeper/datadir
ü 新增:dataLogDir=/var/zookeeper/logsdir
ü 新增:
server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888
4. 分發到其他節點
tar cf zookeeper-done.tar zookeeper-3.4.8
for ip in `seq 1 2`; do scp /hadoop/[email protected]$ip:/hadoop; done
分別解壓縮即可。
5. 在各節點上建立ZK的目錄:資料檔案和日誌存放目錄
1) mkdir -p/var/zookeeper/datadir /var/zookeeper/logsdir
2) pssh -h~/hdp-other-hosts "mkdir -p /var/zookeeper/datadir /var/zookeeper/logsdir"
6. 編輯各節點的myid值
echo 1 > /var/zookeeper/datadir/myid
pssh -H [email protected] "echo 2 > /var/zookeeper/datadir/myid"
pssh -H [email protected] "echo 3 > /var/zookeeper/datadir/myid"
或者使用下面迴圈代替上面的兩句:
for ip in `seq 1 2`; do pssh -H [email protected]$ip "echo$[$ip + 1] > /var/zookeeper/datadir/myid"; done
7. 啟動
zkServer.sh
start :這個命令使得zk服務程序在後臺進行
zkServer.sh start-foreground :在前臺中執行
zkServer.sh print-cmd :可以檢視zookeeper啟動的各個引數,包括java路徑等,也可以便於查詢問題。
執行日誌在zookeeper安裝目錄下的zookeeper.out。另外要注意的是,zookeeper重啟會自動清除zookeeper.out日誌,所以如果出錯要注意先備份這個檔案。
看了下zkServer.sh的程式碼,這個zookeeper.out實際上是nohup的輸出。
研究了下bin/zkServer.sh和conf/log4j.properties,發現zookeeper其實是有日誌相關的輸出的配置,只要定義相關的變數就可以了。
主要是ZOO_LOG_DIR和ZOO_LOG4J_PROP這兩個環境變數:
如果是連線同一臺主機上的zk程序,那麼直接執行bin/目錄下的zkCli.cmd(Windows環境下)或者zkCli.sh(Linux環境下),即可連線上zk。
直接執行zkCli.cmd或者zkCli.sh命令預設以主機號 127.0.0.1,埠號 2181 來連線zk,如果要連線不同機器上的zk,可以使用 -server 引數,例如:
zkCli.sh -server 192.168.229.160:2181,192.168.229.161:2181,192.168.229.162:2181
7.2 安裝Kafka
1. 解壓tar
2. 修改配置檔案:config/server.properties
ü broker.id=1 (另外幾臺機器依次設定為2,3,)
ü log.dirs=/var/kafka/kfklogs (日誌地址,kafka的topic以及資料檔案存放位置)
ü zookeeper.connect=master:2181,slave1:2181,slave2:2181
ü listeners = PLAINTEXT://ip:9092 :監聽列表(以逗號分隔)。hostname如果設定為0.0.0.0則繫結所有的網絡卡地址;如果hostname為空則繫結預設的網絡卡。如果沒有配置則預設為java.net.InetAddress.getCanonicalHostName()
ü auto.create.topics.enable=false 免得程式中寫錯了topic名字時被自動建立了
ü delete.topic.enable 可以物理上刪除一個topic
注意:broker級的配置引數(也就是server.properties),可以由topic級別的覆寫。
n server.properties引數說明
############################# Server Basics#############################
# 唯一標識一個broker.
broker.id=1
############################# Socket Server Settings#############################
#繫結服務監聽的地址和埠,要填寫hostname -i 出來的地址,否則可能會繫結到127.0.0.1,producer可能會發不出訊息
listeners=PLAINTEXT://172.23.8.144:9092
#broker對producers和consumers服務的地址和埠,如果沒有配置,使用listeners的配置,本文沒有配置該項
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 處理網路請求的執行緒數
num.network.threads=3
# 處理磁碟I/O的執行緒數
num.io.threads=8
# socket server的傳送buffer大小 (SO_SNDBUF)
socket.send.buffer.bytes=102400
# socket server的接收buffer大小 (SO_RCVBUF)
socket.receive.buffer.bytes=102400
#一個請求的最大size,用來保護防止oom
socket.request.max.bytes=104857600
############################# Log Basics#############################
#存放日誌和訊息的目錄,可以是用逗號分開的目錄,同樣不推薦使用/tmp
log.dirs=/usr/local/services/kafka/kafka-logs
#每個topic預設partitions的數量,數量較大表示消費者可以有更大的並行度。
num.partitions=2
# The number of threads per data directory to be usedfor log recovery at startup and flushing at shutdown.
# This value is recommended to be increased forinstallations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
#日誌的過期時間,超過後被刪除,單位小時
log.retention.hours=168
#一個日誌檔案最大大小,超過會新建一個檔案
log.segment.bytes=1073741824
#根據過期策略檢查過期檔案的時間間隔,單位毫秒
log.retention.check.interval.ms=300000
############################# Zookeeper#############################
#Zookeeper的連線配置,用逗號隔開,也可以用172.23.8.59:2181/kakfa這樣的方式指定kafka資料在zk中的根目錄
zookeeper.connect=172.23.8.144:2181,172.23.8.179:2181,172.23.8.59:2181
# 連線zk的超時時間
zookeeper.connection.timeout.ms=6000
7.3 啟動kafka
先啟動各臺機器上的zookeeper:bin/zkServer.sh start
日誌就在zookeeper的根目錄下的zoo.out
然後在每臺機器上都啟動Kafka:
nohup bin/kafka-server-start.sh config/server.properties> kfk.out &
tail -f -n500 kfk.out
停止kafka,在每臺機器上執行:bin/kafka-server-stop.sh
7.4 單機偽叢集
因為server.properties就是一個broker的配置,所以,複製幾份不同名字的server.properties,並修改broker.id,listeners,port,log.dirs,即可實現單機偽叢集。
注意:listeners中的埠號必須與port的值一致。
然後,在單機上,用“bin/kafka-server-start.sh每個server.properties”,依次啟動即可。
7.5 使用Kafka
l 建立topic
bin/kafka-topics.sh--create --zookeeper localhost:2181 --replication-factor 2 --partitions 6--topic test01
u partitions:表示建立了一個有6個分割槽的topic
如果有三臺機器(Broker)的話,每臺機器上會隨機兩個目錄:如test01-1,test01-3。
u replication-factor:表示該topic需要在不同的broker中儲存幾份
以上設定為2,因此,某兩個broker上會有相同的兩個目錄:test01-1(其中一個是備份)。
所以,以上命令,由於是6個分割槽、2個備份、3臺機器,所以,每臺機器上有4個目錄。
u 建立topic引數可以設定一個或多個--config "Property(屬性)"
bin/kafka-topics.sh –create ... --configmax.message.bytes=64000 --config flush.messages=1
l 修改topic
使用—alter替換—create,即可修改。
使用--alter--topic my-topic --deleteConfigmax.message.bytes,即可刪除某個引數。
l 檢視topic
看列表:bin/kafka-topics.sh--list --zookeeper localhost:2181
看屬性:bin/kafka-topics.sh--describe --zookeeper localhost:2181 --topic test01
n Leader: 如果有多個broker儲存同一個topic,那麼同時只能有一個Broker負責該topic的讀寫,其它的Broker作為實時備份。負責讀寫的Broker稱為Leader.
每個Replication集合中的Partition都會選出一個唯一的Leader,所有的讀寫請求都由Leader處理。其他Replicas從Leader處把資料更新同步到本地。
n Replicas : 表示該topic的0分割槽在1號和2號broker中儲存
n Isr : 表示當前有效的broker, Isr是Replicas的子集
現在,殺掉一個broker(模擬此點的崩潰):kill -9 PID即可。再看屬性:
會發現Leader已經進行了切換,而且,當前有效的broker中,3已經不存在了。
仔細看前後兩張圖,可以知道,因為3沒了,所以,原來Leader為3的分割槽,就使用了Replicas中的備選,所以,分割槽1的Leader由3變成了2,而分割槽4的Leader由3變成了1。
l 傳送一個訊息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test01
每輸入一行,就是一條訊息
l 消費訊息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 --topic test01 --from-beginning
任意一臺機器上執行以上命令,即可看到訊息。
l 批量構造大量訊息
bin/kafka-verifiable-producer.sh --topic test01 --max-messages 20 --broker-list localhost:9092
l 刪除Topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test01
如果kafaka啟動時載入的配置檔案中server.properties沒有配置delete.topic.enable=true,那麼此時的刪除並不是真正的刪除,而是把topic標記為:marked for deletion。
刪除kafka儲存目錄(server.properties檔案log.dirs配置)相關topic目錄。
7.6 Kafka注意事項
listeners一定要配置成為IP地址;如果配置為localhost或伺服器的hostname,在使用Java傳送資料時就會丟擲異 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因為在沒有配置advertised.host.name 的情況下,Kafka並沒有像官方文件宣稱的那樣改為廣播我們配置的host.name,而是廣播了主機配置的hostname。遠端的客戶端並沒有配置 hosts,所以自然是連線不上這個hostname的。
消費者執行緒數必須是小等於topic的partition分割槽數;可以通過命令:
./kafka-topics.sh --describe --zookeeper"172.16.49.173:2181" --topic "producer_test"命令來檢視分割槽的情況。
kafka會根據partition.assignment.strategy指定的分配策略來指定執行緒消費那些分割槽的訊息;沒有單獨配置該項即是採用的預設值range策略(按照階段平均分配)。比如分割槽有10個、執行緒數有3個,則執行緒 1消費0,1,2,3,執行緒2消費4,5,6,執行緒3消費7,8,9。另外一種是roundrobin(迴圈分配策略),官方文件中寫有使用該策略有兩個前提條件的,所以一般不要去設定。
props.put(“auto.offset.reset”, “smallest”) 是指定從最小沒有被消費offset開始;如果沒有指定該項則是預設的為largest,這樣的話該consumer就得不到生產者先產生的訊息。
使用New Consumer API
Properties props = new Properties(); //brokerServer(kafka)ip地址,不需要把所有叢集中的地址都寫上,可是一個或一部分 props.put("bootstrap.servers", "172.16.49.173:9092"); //設定consumer group name,必須設定 props.put("group.id", a_groupId); //設定自動提交偏移量(offset),由auto.commit.interval.ms控制提交頻率 props.put("enable.auto.commit", "true"); //偏移量(offset)提交頻率 props.put("auto.commit.interval.ms", "1000"); //設定使用最開始的offset偏移量為該group.id的最早。如果不設定,則會是latest即該topic最新一個訊息的offset //如果採用latest,消費者只能得道其啟動後,生產者生產的訊息 props.put("auto.offset.reset", "earliest"); //設定心跳時間 props.put("session.timeout.ms", "30000"); //設定key以及value的解析(反序列)類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //訂閱topic consumer.subscribe(Arrays.asList("topic_test")); while (true) { //每次取100條資訊 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } |
group.id :必須設定
auto.offset.reset:如果想獲得消費者啟動前生產者生產的訊息,則必須設定為earliest;如果只需要獲得消費者啟動後生產者生產的訊息,則不需要設定該項
enable.auto.commit(預設值為true):如果使用手動commit offset則需要設定為false,並再適當的地方呼叫consumer.commitSync(),否則每次啟動消費折後都會從頭開始消費資訊(在auto.offset.reset=earliest的情況下);
官方對於consumer與partition的建議
1. 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許併發的,所以consumer數不要大於partition數
2. 如果consumer比partition少,一個consumer會對應於多個partitions,這裡主要合理分配consumer數和partition數,否則會導致partition裡面的資料被取的不均勻。最好partiton數目是consumer數目的整數倍,所以partition數目很重要,比如取24,就很容易設定consumer數目
3. 如果consumer從多個partition讀到資料,不保證資料間的順序性,kafka只保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同
4. 增減consumer,broker,partition會導致rebalance,所以rebalance後consumer對應的partition會發生變化
5. High-level介面中獲取不到資料的時候是會block的
replication-factor副本:replication factor 控制訊息儲存在幾個broker(伺服器)上,一般情況下等於broker的個數。
檢視topic屬性:bin/kafka-topics.sh --zookeeper zk1:2181 --describe --topictopicname
8 Kafka開發
8.1 Producer
KafkaProducer是一個傳送record到Kafka Cluster的客戶端API。這個類執行緒安全的。在應用程式中,通常的作法是:所有發往一個KafkaCluster的執行緒使用同一個Producer物件.。如果需要給多個Cluster傳送訊息,則需要使用多個Producer。
樣例程式碼:
Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.56.101:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); ProducerRecord<String, String> pducerRecord = new ProducerRecord<String, String>(TOPIC, message); producer.send(pducerRecord, new CallBack(){ ... ... }); |
8.1.1 ProducerRecord
Producer要傳送的訊息記錄類是ProducerRecord。看它的原始碼可知;
一條ProducerRecord通常包括5個欄位:
l topic:指定該record發往哪個topic下。[Required]
l partition:指定該record發到哪個partition中。[Optional]
l key:一個key。[Optional]
l value:記錄人內容。[Required]
l timestamp:時間戳。[Optional]
如果使用者指定了partition,那麼就發往使用者指定的partition。如果使用者沒有指定partition,那麼就會根據key來決定放到哪個partition,如果key也沒有指定,則由producer隨機選取一個partition。
在Producer端,如果使用者指定了timestamp,則record使用使用者指定的時間,如果使用者沒有指定,則會使用producer端的當前時間。在broker端,如果配置了時間戳採用createtime方式,則使用producer傳給Broker的record中的timestramp時間,如果指定為logappendtime,則在broker寫入到Log檔案時會重寫該時間。
8.1.2 send
新版本(0.10)裡面的send函式是一個非同步函式,使用者執行緒呼叫send方法是將record放到BufferPool中快取,並根據batch.size和linger.ms等引數來批量提交。執行流程是:
1. 由interceptorchain對ProducerRecord做傳送前的處理
攔截器介面是:ProducerInterceport,使用者可以自定義自己的攔截器實現。
該攔截器鏈,在Producer物件初始化時初始化,之後不會再變了。所以呢,攔截器鏈中的攔截器都是公用的,如果要自定義攔截器的話,這個是需要注意的。
n ProducerInterceptor有兩個方法:
1) onSend:KafkaProducer#send 呼叫時就會執行此方法。
2) onAcknowledgement:傳送失敗,或者傳送成功(broker 通知producer代表傳送成功)時都會呼叫該方法。
2. 阻塞方式獲取到broker cluster 上broker cluster的資訊
採用RPC方式獲取到的broker資訊,由一個MetaData類封裝。它包括了broker cluster的必要資訊,譬如有:所有的broker資訊(id\host\port等)、所有的topic名稱、每一個topic對於的partition情況(id、leader node、replica nodes、ISR nodes等)。
雖然該過程是阻塞的,但並不是每傳送一個record都會通過RPC方式來獲取的。Metadata會在Producer端快取,只有在record中指定的topic不存在時、或者MetaData輪詢週期到時才會執行。
3. 對record中key、value進行序列化
內建了基於String、Integer、Long、Double、Bytes、ByteBuffer、ByteArray的序列化工具。
4. 為record設定partition屬性
前面說過,建立ProducerRecord時,partition是Optional的。所以如果使用者建立record時,沒有指定partition屬性,則由partition計算工具(Partitioner 介面)來計算出partition。這個計算方式可以自定義。Kafka Producer 提供了內建的實現:
ü 如果提供了Key值,會根據key序列化後的位元組陣列的hashcode進行取模運算。
ü 如果沒有提供key,則採用迭代方式(其實取到的值並非完美的迭代,而是類似於隨機數)。
5. 校驗record的長度是否超出閾值
MAX_REQUEST_SIZE_CONFIG=”max.request.size”
BUFFER_MEMORY_CONFIG=”buffer.memory”
超出任何一項就會丟擲異常。
6. 為record設定timestamp
如果使用者建立ProducerRecord時沒有指定timestamp,設定為producer的當前時間。
其實在java client中,設計了一個Time介面,專門用於設定這個時間的。內建了一個實現SystemTime,這裡將record timestamp設定為當前時間,就是由SystemTime來完成的。所以如果希望在kafka producer java client中使用其它的時間,可以自定義Time的實現。
這個時間戳有什麼要注意的,比如producer和kafkaserver在兩臺機器上,時間不同步,會對後續有什麼影響嗎?
7. 將該record壓縮後放到BufferPool中
這一步是由RecordAccumulator來完成的。RecordAccumulator中為每一個topic維護了一個雙端佇列Deque<RecordBatch>,佇列中的元素是RecordBatch(RecordBatch則由多個record壓縮而成)。RecordAccumulator要做的就是將record壓縮後放到與之topic關聯的那個Deque的最後面。
在將record放到Deque中最後一個RecordBatch中的邏輯為:如果最後一個recordbatch可以放的下就放,放不下就新建一個RecordBatch。
RecordBatch實際上是儲存於BufferPool中,所以這個過程實際上是把record放在BufferPool中。在建立BufferPool之初,會指定BufferPool的總大小,BufferPool中每一個RecordBatch的大小等等配置。
8. 喚醒傳送模組
執行到上一步時,KafkaProducer#sender的處理基本算是完畢。這個一步的目的就是喚醒NIO Selector。
此外,在上述步驟2~8,不論哪一步出現問題,都會丟擲異常。而丟擲異常時,就會被KafkaProducer捕獲到,然後交由Sensor(感測器)進行處理。而Sensor通常會呼叫第1步中提到的interceptorchain 執行onAcknowledgement告知使用者。
8.1.3 send函式的傳送排程處理
KafkaProducer#sender只是將record放到BufferPool中,並沒有將record發出去,而傳送排程,則是由另外一個執行緒(Sender)來完成的。
Sender的執行過程如下:
1. 取出就緒的record
這一步是檢查要傳送的record是否就緒:根據KafkaProducer維護的Metadata檢查要每一個record要發往的Leader node是否存在。如果有不存在的,就設定為需要更新,並且這樣的record認為還未就緒。以保證可以發到相關partition的leader node。
2. 取出RecordBatch,並過濾掉過期的RecordBatch
對於過期的RecordBatch,會通過Sensor通知Interceptor傳送失敗。
3. 為要傳送的RecordBatch建立請求
一個RecordBatch一個ClientRequest。
4. 保留請求併發送
把請求物件保留到一個inFlightRequest 集合中。這個集合中存放的是正在傳送的請求,是一個topic到Deque的Map。當傳送成功,或者失敗都會移除。
5. 處理髮送結果
如果傳送失敗,會嘗試retry。並由Sensor排程Interceptor。
如果傳送成功,會由Sensor排程Interceptor。
8.1.4 Producer實現總結
從上述處理流程中,可以看到在javaclient中的一些設計:
1. InterceptorChain:可以做為用於自定義外掛的介面。
2. MetaData:producer 不按需以及定期的傳送請求獲取最新的Cluster狀態資訊。Producer根據這個資訊可以直接將record batch傳送到相關partition的Leader中。也就是在客戶端完成Load balance。
3. Partitioner:分割槽選擇工具,選擇傳送到哪些分割槽,結合Metadata,完成Load balance。
4. RecordBatch:在客戶端對record壓縮排RecordBatch,然後一個RecordBatch發一次。這樣可以減少IO操作的次數,提高效能。
5. 非同步方式傳送:提高使用者應用效能。
8.1.5 Producer 配置說明
l bootstrap.servers
用於配置cluster中borker的host/port對。可以配置一項或者多項,不需要將cluster中所有例項都配置上。因為它後自動發現所有的broker。
如果要配置多項,格式是:host1:port1,host2:port2,host3:port3….
l key.serializer、value.serializer
配置序列化類名。指定 的這些類都要實現Serializer介面。
l acks
為了確保message record被broker成功接收。Kafka Producer會要求Borker確認請求(傳送RecordBatch的請求)完成情況。
對於message接收情況的確認,Kafka Broker支援了三種情形:1、不需要確認;2)leader接收到就確認;3)等所有可用的follower複製完畢進行確認。可以看出,這三種情況代表不同的確認粒度。在JavaProducer Client中,對三種情形都做了支援,上述三種情形分別對應了三個配置項:0、1、-1。其實還有一個值是all,它其實就是-1。
Kafka Producer Java Client 是如何支援這三種確認:
1) 在為RecordBatch建立請求時,acks的值會被封裝為請求頭的一部分。
2) 傳送請求後(接收到Broker響應前),立即判斷是否需要確認該請求是否完成(即該RecordBatch是否被Broker成功接收),判斷依據是acks的值是否是0。如果是0,即不需要進行確認。那麼就認定該請求成功完成。既然認定是成功,那麼就不會進行retry了。
如果值不是0,就要等待Broker的響應了。根據響應情況,來判斷請求是否成功完成。
該配置項預設值是1,即leader接收後就響應。
l buffer.memory
BufferPool Size,也就是等待發送的Record的空間大小。預設值是:33554432,即32MB。
配置項的單位是byte,範圍是:[0,….]
l compression.type
Kafka提供了多種壓縮型別,可選值有4個: none, gzip, snappy, lz4。預設值是none。
l retries
當一個RecordBatch傳送失敗時,就會重新改善以確保資料完成交付。該配置設定了重試次數,值範圍[0, Integer.Max]。如果是0,即便失敗,也不會進行重發。
如果允許重試(即retries>0),但max.in.flight.requests.per.connection 沒有設定成1。這種情況下,就可能會出現records的順序改變的現象。例如:一個prodcuder client的sender執行緒在一次輪詢中,如果有兩個recordbatch都要傳送到同步一個partition中,此時它們肯定是發往同一個broker的,並且是用的同一個TCP connection。如果出現RecordBatch1先發,但是傳送失敗,RecordBatch2緊接著RecordBatch1傳送,它是傳送成功的。然後RecordBatch1會進行重發。這樣一來,就出現了broker接收到的順序是RecordBatch2先於RecordBatch1的情況。
l batch.size
RecordBatch的最大容量。預設值是16384(16KB)。
l ssl.key.password
Keystore 檔案中私鑰的密碼。可選的。
l ssl.keystore.location
Keystore檔案的位置。可選的。
l ssl.keystore.password
Keystore 檔案的密碼。可選的。
l ssl.truststore.location
Trust store 檔案的位置。可選的。
l ssl.truststore.password
Trust store檔案的密碼。可選的。
l client.id
邏輯名,client給broker發請求是會用到。預設值是:””。
l connections.max.idle.ms
Connection的最大空閒時間。預設值是540000 (9 min)
l linger.ms
Socket :solinger。延遲。預設值:0,即不延遲。
l max.block.ms
當需要的metadata未到達之前(例如要傳送的record的topic,在Client中還沒有相關記錄時),執行KafkaProducer#send時,內部處理會等待MetaData的到達。這是個阻塞的操作。為了防止無限等待,設定這個阻塞時間是必要的。範圍:[0, Long.MAX]
l max.request.size
最大請求長度,在將record壓縮到RecordBatch之前會進行校驗。超過這個大小會丟擲異常。
l partitioner.class
用於自定義partitioner演算法。預設值是:
org.apache.kafka.clients.producer.internals.DefaultPartitioner
l receive.buffer.byte
TCP receiver buffer的大小。取值範圍:[-1, …]。這個配置項的預設值是32768(即 32KB)。
如果設定為-1,則會採用作業系統的預設值。
l request.timeout.ms
最大請求時長。因為發起請求後,會等待broker的響應,如果超過這個時間就認為請求失敗。
l timeout.ms
這個時間配置的是follower到leader的ack超時時間。這個時間和producer傳送的請求的網路無關。
l block.on.buffer.full
當bufferPool用完後,如果client還在使用KafkaProducer傳送record,要麼是BufferPool拒絕接收,要麼是丟擲異常。
這個配置是預設值是flase,也就是當bufferpool滿時,不會丟擲BufferExhaustException,而是根據max.block.ms進行阻塞,如果超時丟擲TimeoutExcpetion。
如果這個屬性值是true,則會把max.block.ms值設定為Long.MAX。另外該配置為true時,metadata.fetch.time.ms將不會生效了。
l interceptor.class
自定義攔截器類。預設情況下沒有指定任何的interceptor。
l max.in.flight.requests.per.connection
每個連線中處於傳送狀態的請求數的最大值。預設值是5。範圍是[1, Integer.MAX]
l metric.reporters
MetricReporter的實現類。預設情況下,會自動的註冊JmxReporter。
l metrics.num.samples
計算metric時的取樣數。預設值是2。範圍:[1,Integer.MAX]
l metrics.sample.window.ms
取樣的時間視窗。預設值是30000(30s)。範圍:[0, Long.MAX]
如果我的kafka叢集已經開始運行了2個小時,有一個消費程式在持續的處理著訊息。
8.2 Consumer
相關推薦
Kafka 0.10 安裝及使用
1 總體說明 筆記本:i5第六代,16G記憶體,256G固態硬碟 使用VirtualBox 5.0.22建立3臺虛擬機器。 2 主機規化 主機名 IP 用途 master 192.168.56.101 slave1 192.168.5
centos下安裝單機版kafka-0.10.0.1
1.環境說明 主機資訊如下: 1[[email protected] soft]# hostname2test13[[email protected] soft]# cat /etc/hosts4127.0.0.1 localh
Kafka-0.10.1叢集的安裝和配置
準備 1.kafka_2.10-0.10.1.1.tgz 2.安裝配置好的Zookeeper-3.4.10分散式叢集 mini1:192.168.213.133 mini2:192.168.213.134 mini3:192.168.213.135
win7+opencv3.0.0+vs2010 安裝及配置
64位 結果 環境變量 space 防止 控制 屬性頁 屬性 鏈接器 最近看《學習opencv》,想要跑人臉識別的例子,於是先配環境吧。 1、 opencv下載: 具體下載地址,http://opencv.org/,官網太慢,百度網盤的資源鏈接:http://pan.ba
kafka集群安裝及管理(一)
ini nohup class 技術分享 -o -h timeout lba 能夠 一、環境配置1.系統環境[root@date ~]# cat /etc/centos-release CentOS Linux release 7.4.1708 (Core)2..安裝JAV
kafka集群安裝及管理(二)
進程 node 規則 nfa 情況 tor back per art 一、broker的遷移1.查看zookeeper和kafka啟動情況[root@slave1 ~]# pssh -h hostlist -i 'jps' [1] 22:08:11 [SUC
關於CDH5.11.0自帶kafka 0.10 bootstrap-server 無法消費
出現 指定 pre hit tst apache 10.2.2 來看 min 近日需要在項目用到kafka,然後本地使用cdh集成的kafka 進行安裝調試,以及些樣例代碼,sparkstreaming 相關調用kafka 的代碼使用的原始的api 而沒有走zook
新手小白Linux(Centos6.5)部署java web項目(mongodb4.0.2安裝及相關操作)
read har space 創建 縮進 路徑 .org font url 紅帽企業或CentOS的Linux上安裝MongoDB的社區版: https://docs.mongodb.com/manual/tutorial/install-mongodb-on-red-ha
scala spark-streaming整合kafka (spark 2.3 kafka 0.10)
obj required word 錯誤 prope apache rop sta move Maven組件如下: <dependency> <groupId>org.apache.spark</groupId> <
java8下spark-streaming結合kafka程式設計(spark 2.3 kafka 0.10)
前面有說道spark-streaming的簡單demo,也有說到kafka成功跑通的例子,這裡就結合二者,也是常用的使用之一。 1.相關元件版本 首先確認版本,因為跟之前的版本有些不一樣,所以才有必要記錄下,另外仍然沒有使用scala,使用java8,spark 2.0.0,kafk
大資料學習之路97-kafka直連方式(spark streaming 整合kafka 0.10版本)
我們之前SparkStreaming整合Kafka的時候用的是傻瓜式的方式-----createStream,但是這種方式的效率很低。而且在kafka 0.10版本之後就不再提供了。 接下來我們使用Kafka直連的方式,這種方式其實是呼叫Kafka底層的消費資料的API,我們知道,越底層的東
Kafka 0.10.1.1 以時間戳查詢訊息和暫停某些分割槽消費和消費速度控制
轉自:https://www.jianshu.com/p/a4c1d281b66a 1. 以時間戳查詢訊息 (1) Kafka 新版消費者基於時間戳索引消費訊息 kafka 在 0.10.1.1 版本增加了時間索引檔案,因此我們可以根據時間戳來訪問訊息。 如以下需求:從半個小時之前的of
Kafka 0.10 常用運維命令
引言 Kafka是由LinkedIn開發的一個分散式的訊息系統,它以可水平擴充套件和高吞吐率而被廣泛使用,現在已經是Apache的專案。 Kafka系統自帶了豐富的運維管理工具,都是基於命令列的,本文主要介紹一些常用的命令。 讀者需要對Kafka已經有入門級的瞭解。 常用命令 以下命令都是在Kafk
【JMeter4.0】安裝及執行(windows環境)
安裝 JDK安裝及配置 安裝對應版本的java環境,配置好環境變數。 版本對應關係參考下表: JMeter版本 JDK版本 4.0 1.8 or 1.9 3.2/3.3 1.8+ 3.0/3.1 1.7+ JDK環境變數配置: “我的
Apache2.4.34 + php 7.28 + MySQL8.0.12 安裝及配置
服務端的學習 Apache2.4.34 的安裝及配置 1.基本安裝 最新的 Apache 已經不提供 Windows 的安裝版本,所以我們這裡使用的是解壓版 安裝方式如下 1.注意:需要使用管理員身份執行命令列!!! 2. 切換到 Apache 解壓
kafka-0.10.0.1版本在java客戶端傳送資料接收不到
錯誤日誌: org.apache.kafka.common.errors.TimeoutException: Batch containing 100 record kafka版本:0.10.0.1 zookeeper版本: 3.4.5 產生的現象: 編寫好java
Apache Kafka 0.10.0.0&0.11.0.0新特性 更新日誌
一、About 0.10.0.0 Apache Kafka 0.10.0.0於美國時間2016年5月24日正式釋出。Apache Kafka 0.10.0.0是Apache Kafka的主要版本。以下是新特性: 1、Kafka Streams Kafk
Centos7 下 MongoDB4.0.0 的安裝及複製集配置
#mongod.conf #for documentation of all options, see: #http://docs.mongodb.org/manual/reference/configuration-options/ systemLog: destination: file lo
kafka 0.10傳送、接收大訊息解決辦法
配置三個地方: Broker: message.max.bytes and replica.fetch.max.bytes Producer: max.request.sizeConsumer: max.partition.fetch.bytes注意: message.ma
java8下spark-streaming結合kafka程式設計(spark 2.0 & kafka 0.10)
1.相關元件版本 首先確認版本,因為跟之前的版本有些不一樣,所以才有必要記錄下,另外仍然沒有使用scala,使用java8,spark 2.0.0,kafka 0.10。 2.引入maven包 網上找了一些結合的例子,但是跟我當前版本不一樣,所以根本就