1. 程式人生 > >Kafka 0.10 安裝及使用

Kafka 0.10 安裝及使用

1    總體說明

筆記本:i5第六代,16G記憶體,256G固態硬碟

使用VirtualBox 5.0.22建立3臺虛擬機器。

2    主機規化

主機名

IP

用途

master

192.168.56.101

slave1

192.168.56.102

slave2

192.168.56.103

3    目錄規化

元件

目錄

說明

JDK

/usr/java/jdk1.8.0_92

ln -s /usr/java/jdk1.8.0_92 /usr/java/default

zookeeper

/opt/zookeeper

kafka

/opt/kafka

各輸出目錄的根

/var/kafka/

/var/zookeeper/

4    埠規化

說明

9092

2888

ZooKeeper,如果是Leader,用來監聽Follower的連線

3888

ZooKeeper,用於Leader選舉

2181

ZooKeeper,用來監聽客戶端的連線

5    作業系統配置

5.1   OS安裝

l  使用CentOS 6.5版

l  磁碟劃分:

/boot  500MB  ext4 boot partition # 迫使主分割槽

swap   2GB swap # 與實體記憶體一樣大

/      剩餘空間 ext4 # 迫使主分割槽

/data1 sda所有空間 ext4

/data2 sdb所有空間 ext4

l  安裝軟體:sysstat , httpd, tftp-server, ntp

啟動sysstat :/etc/init.d/sysstat start

設定sysstat自啟動:checkfig sysstat on

安裝pssh

tar zxf pssh-2.3.1.tar.gz

cd pssh-2.3.1

python setup.py install

pssh   多主機並行執行命令

pscp   傳輸檔案到多個hosts,他的特性和scp差不多

pslurp   從多臺遠端機器拷貝檔案

pnuke   kill

遠端機器的程序

pslurp   從遠端主機考本檔案到本地

prsync   使用rsync協議從本地計算機同步到遠端主機

5.2   配置SSH免密登陸

原理:就是我把我的公鑰放到你的authorized_keys裡面,然後我就可以ssh無密碼登入你了

5.2.1 配置規則

u  NameNode 能免密碼登入所有的 DataNode

u  SecondaryNameNode 能免密碼登入所有的 DataNode

u  NameNode 能免密碼登入自己

u  SecondaryNameNode 能免密碼登入自己

u  NameNode 能免密碼登入 SecondaryNameNode

u  SecondaryNameNode 能免密碼登入 NameNode

u  DataNode 能免密碼登入自己

u  DataNode 不需要配置免密碼登入 NameNode、SecondaryNameNode和其它 DataNode。

5.2.2 配置步驟

l  在master(NameNode)上執行:

cd ~

ssh-keygen -t rsa # 一路回車。

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

ssh localhost # 驗證本機無密碼登陸

ssh master # 驗證本機無密碼登陸

for ip in `seq 1 2`; do scp ~/.ssh/[email protected]$ip:~/keys.master; done

l  在slave1(SecondaryNameNode, DataNode)上執行:

cd ~

ssh-keygen -t rsa

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

ssh localhost # 驗證本機無密碼登陸

ssh slave1 # 驗證本機無密碼登陸

cat ~/keys.master >> ~/.ssh/authorized_keys # 實現master免密碼登陸slave1

master主機上執行:cat ~/keys.slave1 >> ~/.ssh/authorized_keys

l  在slave2(DataNode)上執行:

cd ~

ssh-keygen -t rsa

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

ssh localhost # 驗證本機無密碼登陸

ssh slave2 # 驗證本機無密碼登陸

cat ~/keys.master >> ~/.ssh/authorized_keys

cat ~/keys.slave1 >> ~/.ssh/authorized_keys

l  免密碼登陸驗證

master(NN)登入所有節點

slave1(SNN)登入所有的 DataNode(slave2)

slave1(SNN)登入NameNode(master)

5.3   系統配置

每臺主機執行以下操作:

1.  修改hosts,增加各個主機

echo "# cluster hosts" >> /etc/hosts

echo "192.168.56.101 master" >>/etc/hosts

echo "192.168.56.102 slave1" >>/etc/hosts

echo "192.168.56.103 slave2" >>/etc/hosts

cat /etc/hosts

for ip in `seq 1 2`; do scp /etc/[email protected]$ip:/etc; done

2.  定義其他節點主機列表檔案

cd ~

vi hdp-other-hosts

slave1

slave2

該檔案是為了使用pssh進行批量操作用。檔案內容為每行一個主機名字。

    之後,可使用類似如下命令從master主機上批量在其他主機上執行命令:

    例如:pssh -h hdp-other-hosts "mkdir -p /hadoop /test"

3.  修改OS引數,如關閉防火牆,增加開啟檔案數等

l  修改主機名

cat /etc/sysconfig/network #看看主機名字是否正確,不對則修改

service network restart

l  # 關閉iptables.

service iptables stop

service iptables status

chkconfig iptables off

chkconfig --list | grep iptables

pssh -h hdp-other-hosts "service iptables stop;chkconfig iptables off"

# 兩個命令中間用分號;以實現連續執行,即使有錯也會繼續。

# 如果每個命令被 && 號分隔,那麼這些命令會一直執行下去,如果中間有錯誤的命令存在,則不再執行後面的命令。

l  # 關閉selinux.

vi /etc/selinux/config

  SELINUX=disabled

l  # 關閉透明大頁

vi /etc/rc.local

if test -f /sys/kernel/mm/transparent_hugepage/enabled;then

    echo never> /sys/kernel/mm/transparent_hugepage/enabled

fi

if test -f /sys/kernel/mm/transparent_hugepage/defrag;then

    echo never> /sys/kernel/mm/transparent_hugepage/defrag

fi

l  # 讓機器儘量使用實體記憶體。如果設定成0,則意味著只使用實體記憶體。

echo "vm.swappiness=10" >>/etc/sysctl.conf

l  # 最大開啟檔案資料和最大程序數 limits.conf 每臺機器都修改

vi /etc/security/limits.conf # 新增下面的內容

* - nofile 65535

* - nproc 65535

每臺機器重啟:reboot

l  檢查修改效果:

/usr/sbin/sestatus -v # 如果SELinux status引數為enabled即為開啟狀態

或者使用:getenforce,也可檢查SELinux的狀態

# 如果輸出結果為[always]表示透明大頁啟用了。[never]表示禁用、[madvise]表示

cat /sys/kernel/mm/transparent_hugepage/enabled, defrag

cat /proc/sys/vm/swappiness #應該是10

ulimit -a #應該是65535

5.4   NTP配置

l  NTP伺服器(master)配置

vi /etc/ntp.conf

server 127.127.1.0

fudge 127.127.1.0 stratum 10

restrict 192.168.56.0 255.255.255.0 nomodifynotrap

chkconfig ntpd on

service ntpd start

ntpstat # show : synchronised to local net at stratum 11

l  其他主機配置

vi /etc/ntp.conf

server master

service ntpd start

chkconfig ntpd on

crontab -e

*/10 * * * * /usr/sbin/ntpdate -u master&>/var/log/ntpdate-cron.log

# 每10分鐘同步一次,'&'開始是為了除錯用

10分鐘後,各主機上執行:ntpstat,看是否同步了

如果機器重啟了,slave上執行ntpstat,顯示未同步,執行一次:service ntpd restart,再執行ntpstat,就顯示同步了。不知道這個怎麼搞好

6    JAVA安裝

l  在master上安裝JAVA

rpm -ivh jdk8.rpm

echo "export JAVA_HOME=/usr/java/default">> /etc/profile

echo "exportCLASSPATH=.:$JAVA_HOME/lib/tools.jar" >> /etc/profile

echo "export PATH=$JAVA_HOME/bin:$PATH">> /etc/profile

source /etc/profile

gtar cf java.gz /usr/java/

for ip in `seq 1 2`; do scp java.gz [email protected]$ip:/usr;done

for ip in `seq 1 2`; do scp /etc/[email protected]$ip:/etc; done

rm java.gz

java -version # 檢驗

l  在其他各主機上安裝JAVA

cd /usr

gtar xf java.gz

rm java.gz

java -version # 檢驗

7    Kafka安裝

7.1   安裝zookeeper

1.  tar zxvfzookeeper-3.4.8.tar.gz

2.  cpzookeeper/conf/zoo_sample.cfg zookeeper/conf/zoo.cfg

3.  編輯zoo.cfg

ü  修改:dataDir=/var/zookeeper/datadir

ü  新增:dataLogDir=/var/zookeeper/logsdir

ü  新增:

server.1=master:2888:3888

server.2=slave1:2888:3888

server.3=slave2:2888:3888

4.  分發到其他節點

tar cf zookeeper-done.tar zookeeper-3.4.8

for ip in `seq 1 2`; do scp /hadoop/[email protected]$ip:/hadoop; done

分別解壓縮即可。

5.  在各節點上建立ZK的目錄:資料檔案和日誌存放目錄

1)  mkdir -p/var/zookeeper/datadir /var/zookeeper/logsdir

2)  pssh -h~/hdp-other-hosts "mkdir -p /var/zookeeper/datadir /var/zookeeper/logsdir"

6.  編輯各節點的myid值

echo 1 > /var/zookeeper/datadir/myid

pssh -H [email protected] "echo 2 > /var/zookeeper/datadir/myid"

pssh -H [email protected] "echo 3 > /var/zookeeper/datadir/myid"

或者使用下面迴圈代替上面的兩句:

for ip in `seq 1 2`; do pssh -H [email protected]$ip "echo$[$ip + 1] > /var/zookeeper/datadir/myid"; done

7.  啟動

zkServer.sh start :這個命令使得zk服務程序在後臺進行
zkServer.sh start-foreground :在前臺中執行
zkServer.sh print-cmd  :可以檢視zookeeper啟動的各個引數,包括java路徑等,也可以便於查詢問題。
執行日誌在zookeeper安裝目錄下的zookeeper.out。另外要注意的是,zookeeper重啟會自動清除zookeeper.out日誌,所以如果出錯要注意先備份這個檔案。
看了下zkServer.sh的程式碼,這個zookeeper.out實際上是nohup的輸出。
研究了下bin/zkServer.sh和conf/log4j.properties,發現zookeeper其實是有日誌相關的輸出的配置,只要定義相關的變數就可以了。
主要是ZOO_LOG_DIR和ZOO_LOG4J_PROP這兩個環境變數:

如果是連線同一臺主機上的zk程序,那麼直接執行bin/目錄下的zkCli.cmd(Windows環境下)或者zkCli.sh(Linux環境下),即可連線上zk。

直接執行zkCli.cmd或者zkCli.sh命令預設以主機號 127.0.0.1,埠號 2181 來連線zk,如果要連線不同機器上的zk,可以使用 -server 引數,例如:

zkCli.sh -server 192.168.229.160:2181,192.168.229.161:2181,192.168.229.162:2181

7.2   安裝Kafka

1.  解壓tar

2.  修改配置檔案:config/server.properties

ü  broker.id=1 (另外幾臺機器依次設定為2,3,)

ü  log.dirs=/var/kafka/kfklogs (日誌地址,kafka的topic以及資料檔案存放位置)

ü  zookeeper.connect=master:2181,slave1:2181,slave2:2181

ü  listeners = PLAINTEXT://ip:9092 :監聽列表(以逗號分隔)。hostname如果設定為0.0.0.0則繫結所有的網絡卡地址;如果hostname為空則繫結預設的網絡卡。如果沒有配置則預設為java.net.InetAddress.getCanonicalHostName()

ü  auto.create.topics.enable=false 免得程式中寫錯了topic名字時被自動建立了

ü  delete.topic.enable 可以物理上刪除一個topic

注意:broker級的配置引數(也就是server.properties),可以由topic級別的覆寫。

server.properties引數說明

############################# Server Basics#############################

# 唯一標識一個broker.

broker.id=1

############################# Socket Server Settings#############################

#繫結服務監聽的地址和埠,要填寫hostname -i 出來的地址,否則可能會繫結到127.0.0.1,producer可能會發不出訊息

listeners=PLAINTEXT://172.23.8.144:9092

#broker對producers和consumers服務的地址和埠,如果沒有配置,使用listeners的配置,本文沒有配置該項

#advertised.listeners=PLAINTEXT://your.host.name:9092

# 處理網路請求的執行緒數

num.network.threads=3

# 處理磁碟I/O的執行緒數

num.io.threads=8

# socket server的傳送buffer大小 (SO_SNDBUF)

socket.send.buffer.bytes=102400

# socket server的接收buffer大小 (SO_RCVBUF)

socket.receive.buffer.bytes=102400

#一個請求的最大size,用來保護防止oom

socket.request.max.bytes=104857600

############################# Log Basics#############################

#存放日誌和訊息的目錄,可以是用逗號分開的目錄,同樣不推薦使用/tmp

log.dirs=/usr/local/services/kafka/kafka-logs

#每個topic預設partitions的數量,數量較大表示消費者可以有更大的並行度。

num.partitions=2

# The number of threads per data directory to be usedfor log recovery at startup and flushing at shutdown.

# This value is recommended to be increased forinstallations with data dirs located in RAID array.

num.recovery.threads.per.data.dir=1

#日誌的過期時間,超過後被刪除,單位小時

log.retention.hours=168

#一個日誌檔案最大大小,超過會新建一個檔案

log.segment.bytes=1073741824

#根據過期策略檢查過期檔案的時間間隔,單位毫秒

log.retention.check.interval.ms=300000

############################# Zookeeper#############################

#Zookeeper的連線配置,用逗號隔開,也可以用172.23.8.59:2181/kakfa這樣的方式指定kafka資料在zk中的根目錄

zookeeper.connect=172.23.8.144:2181,172.23.8.179:2181,172.23.8.59:2181

# 連線zk的超時時間

zookeeper.connection.timeout.ms=6000

7.3   啟動kafka

先啟動各臺機器上的zookeeper:bin/zkServer.sh start

日誌就在zookeeper的根目錄下的zoo.out

然後在每臺機器上都啟動Kafka:

nohup bin/kafka-server-start.sh config/server.properties> kfk.out &

tail -f -n500 kfk.out

停止kafka,在每臺機器上執行:bin/kafka-server-stop.sh

7.4   單機偽叢集

因為server.properties就是一個broker的配置,所以,複製幾份不同名字的server.properties,並修改broker.id,listeners,port,log.dirs,即可實現單機偽叢集。

注意:listeners中的埠號必須與port的值一致。

然後,在單機上,用“bin/kafka-server-start.sh每個server.properties”,依次啟動即可。

7.5   使用Kafka

l  建立topic

bin/kafka-topics.sh--create --zookeeper localhost:2181 --replication-factor 2 --partitions 6--topic test01

u  partitions:表示建立了一個有6個分割槽的topic

如果有三臺機器(Broker)的話,每臺機器上會隨機兩個目錄:如test01-1,test01-3。

u  replication-factor:表示該topic需要在不同的broker中儲存幾份

以上設定為2,因此,某兩個broker上會有相同的兩個目錄:test01-1(其中一個是備份)。

    所以,以上命令,由於是6個分割槽、2個備份、3臺機器,所以,每臺機器上有4個目錄。

u  建立topic引數可以設定一個或多個--config "Property(屬性)"

bin/kafka-topics.sh –create ... --configmax.message.bytes=64000 --config flush.messages=1

l  修改topic

使用—alter替換—create,即可修改。

使用--alter--topic my-topic  --deleteConfigmax.message.bytes,即可刪除某個引數。

l  檢視topic

看列表:bin/kafka-topics.sh--list --zookeeper localhost:2181

看屬性:bin/kafka-topics.sh--describe --zookeeper localhost:2181 --topic test01

n  Leader: 如果有多個broker儲存同一個topic,那麼同時只能有一個Broker負責該topic的讀寫,其它的Broker作為實時備份。負責讀寫的Broker稱為Leader.

每個Replication集合中的Partition都會選出一個唯一的Leader,所有的讀寫請求都由Leader處理。其他Replicas從Leader處把資料更新同步到本地。

n  Replicas : 表示該topic的0分割槽在1號和2號broker中儲存

n  Isr : 表示當前有效的broker, Isr是Replicas的子集

現在,殺掉一個broker(模擬此點的崩潰):kill -9 PID即可。再看屬性:

會發現Leader已經進行了切換,而且,當前有效的broker中,3已經不存在了。

仔細看前後兩張圖,可以知道,因為3沒了,所以,原來Leader為3的分割槽,就使用了Replicas中的備選,所以,分割槽1的Leader由3變成了2,而分割槽4的Leader由3變成了1。

l  傳送一個訊息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test01

每輸入一行,就是一條訊息

l  消費訊息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 --topic test01 --from-beginning
任意一臺機器上執行以上命令,即可看到訊息。

l  批量構造大量訊息

bin/kafka-verifiable-producer.sh --topic test01 --max-messages 20 --broker-list localhost:9092

l  刪除Topic

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test01
如果kafaka啟動時載入的配置檔案中server.properties沒有配置delete.topic.enable=true,那麼此時的刪除並不是真正的刪除,而是把topic標記為:marked for deletion。
刪除kafka儲存目錄(server.properties檔案log.dirs配置)相關topic目錄。

7.6   Kafka注意事項

listeners一定要配置成為IP地址;如果配置為localhost或伺服器的hostname,在使用Java傳送資料時就會丟擲異 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因為在沒有配置advertised.host.name 的情況下,Kafka並沒有像官方文件宣稱的那樣改為廣播我們配置的host.name,而是廣播了主機配置的hostname。遠端的客戶端並沒有配置 hosts,所以自然是連線不上這個hostname的。

消費者執行緒數必須是小等於topic的partition分割槽數;可以通過命令:

./kafka-topics.sh --describe --zookeeper"172.16.49.173:2181" --topic "producer_test"命令來檢視分割槽的情況。

kafka會根據partition.assignment.strategy指定的分配策略來指定執行緒消費那些分割槽的訊息;沒有單獨配置該項即是採用的預設值range策略(按照階段平均分配)。比如分割槽有10個、執行緒數有3個,則執行緒 1消費0,1,2,3,執行緒2消費4,5,6,執行緒3消費7,8,9。另外一種是roundrobin(迴圈分配策略),官方文件中寫有使用該策略有兩個前提條件的,所以一般不要去設定。

props.put(“auto.offset.reset”, “smallest”) 是指定從最小沒有被消費offset開始;如果沒有指定該項則是預設的為largest,這樣的話該consumer就得不到生產者先產生的訊息。

使用New Consumer API

Properties props = new Properties();

//brokerServer(kafka)ip地址,不需要把所有叢集中的地址都寫上,可是一個或一部分

props.put("bootstrap.servers", "172.16.49.173:9092");

//設定consumer group name,必須設定

props.put("group.id", a_groupId);

//設定自動提交偏移量(offset),由auto.commit.interval.ms控制提交頻率

props.put("enable.auto.commit", "true");

//偏移量(offset)提交頻率

props.put("auto.commit.interval.ms", "1000");

//設定使用最開始的offset偏移量為該group.id的最早。如果不設定,則會是latest即該topic最新一個訊息的offset

//如果採用latest,消費者只能得道其啟動後,生產者生產的訊息

props.put("auto.offset.reset", "earliest");

//設定心跳時間

props.put("session.timeout.ms", "30000");

//設定key以及value的解析(反序列)類

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

//訂閱topic

consumer.subscribe(Arrays.asList("topic_test"));

while (true) {

    //每次取100條資訊

    ConsumerRecords<String, String> records = consumer.poll(100);

    for (ConsumerRecord<String, String> record : records)

    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());

 }

group.id :必須設定

auto.offset.reset:如果想獲得消費者啟動前生產者生產的訊息,則必須設定為earliest;如果只需要獲得消費者啟動後生產者生產的訊息,則不需要設定該項

enable.auto.commit(預設值為true):如果使用手動commit offset則需要設定為false,並再適當的地方呼叫consumer.commitSync(),否則每次啟動消費折後都會從頭開始消費資訊(在auto.offset.reset=earliest的情況下);

官方對於consumer與partition的建議

1.  如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許併發的,所以consumer數不要大於partition數

2.  如果consumer比partition少,一個consumer會對應於多個partitions,這裡主要合理分配consumer數和partition數,否則會導致partition裡面的資料被取的不均勻。最好partiton數目是consumer數目的整數倍,所以partition數目很重要,比如取24,就很容易設定consumer數目

3.  如果consumer從多個partition讀到資料,不保證資料間的順序性,kafka只保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同

4.  增減consumer,broker,partition會導致rebalance,所以rebalance後consumer對應的partition會發生變化

5.  High-level介面中獲取不到資料的時候是會block的

replication-factor副本:replication factor 控制訊息儲存在幾個broker(伺服器)上,一般情況下等於broker的個數。

檢視topic屬性:bin/kafka-topics.sh --zookeeper zk1:2181 --describe --topictopicname

8    Kafka開發

8.1   Producer

KafkaProducer是一個傳送record到Kafka Cluster的客戶端API。這個類執行緒安全的。在應用程式中,通常的作法是:所有發往一個KafkaCluster的執行緒使用同一個Producer物件.。如果需要給多個Cluster傳送訊息,則需要使用多個Producer。

樣例程式碼:

Properties properties = new Properties();

properties.put("bootstrap.servers", "192.168.56.101:9092");

properties.put("acks", "all");

properties.put("retries", 0);

properties.put("batch.size", 16384);

properties.put("linger.ms", 1);

properties.put("buffer.memory", 33554432);

properties.put("key.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

properties.put("value.serializer",

"org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String, String> pducerRecord = new ProducerRecord<String, String>(TOPIC, message);

producer.send(pducerRecord, new CallBack(){ ... ... });

8.1.1 ProducerRecord

Producer要傳送的訊息記錄類是ProducerRecord。看它的原始碼可知;

一條ProducerRecord通常包括5個欄位:

l  topic:指定該record發往哪個topic下。[Required]

l  partition:指定該record發到哪個partition中。[Optional]

l  key:一個key。[Optional]

l  value:記錄人內容。[Required]

l  timestamp:時間戳。[Optional]

如果使用者指定了partition,那麼就發往使用者指定的partition。如果使用者沒有指定partition,那麼就會根據key來決定放到哪個partition,如果key也沒有指定,則由producer隨機選取一個partition。

在Producer端,如果使用者指定了timestamp,則record使用使用者指定的時間,如果使用者沒有指定,則會使用producer端的當前時間。在broker端,如果配置了時間戳採用createtime方式,則使用producer傳給Broker的record中的timestramp時間,如果指定為logappendtime,則在broker寫入到Log檔案時會重寫該時間。

8.1.2 send

新版本(0.10)裡面的send函式是一個非同步函式,使用者執行緒呼叫send方法是將record放到BufferPool中快取,並根據batch.size和linger.ms等引數來批量提交。執行流程是:

1.  由interceptorchain對ProducerRecord做傳送前的處理

攔截器介面是:ProducerInterceport,使用者可以自定義自己的攔截器實現。

該攔截器鏈,在Producer物件初始化時初始化,之後不會再變了。所以呢,攔截器鏈中的攔截器都是公用的,如果要自定義攔截器的話,這個是需要注意的。

n  ProducerInterceptor有兩個方法:

1)  onSend:KafkaProducer#send 呼叫時就會執行此方法。

2)  onAcknowledgement:傳送失敗,或者傳送成功(broker 通知producer代表傳送成功)時都會呼叫該方法。

2.  阻塞方式獲取到broker cluster 上broker cluster的資訊

採用RPC方式獲取到的broker資訊,由一個MetaData類封裝。它包括了broker cluster的必要資訊,譬如有:所有的broker資訊(id\host\port等)、所有的topic名稱、每一個topic對於的partition情況(id、leader node、replica nodes、ISR nodes等)。

雖然該過程是阻塞的,但並不是每傳送一個record都會通過RPC方式來獲取的。Metadata會在Producer端快取,只有在record中指定的topic不存在時、或者MetaData輪詢週期到時才會執行。

3.  對record中key、value進行序列化

內建了基於String、Integer、Long、Double、Bytes、ByteBuffer、ByteArray的序列化工具。

4.  為record設定partition屬性

前面說過,建立ProducerRecord時,partition是Optional的。所以如果使用者建立record時,沒有指定partition屬性,則由partition計算工具(Partitioner 介面)來計算出partition。這個計算方式可以自定義。Kafka Producer 提供了內建的實現:

ü  如果提供了Key值,會根據key序列化後的位元組陣列的hashcode進行取模運算。

ü  如果沒有提供key,則採用迭代方式(其實取到的值並非完美的迭代,而是類似於隨機數)。

5.  校驗record的長度是否超出閾值

MAX_REQUEST_SIZE_CONFIG=”max.request.size”

BUFFER_MEMORY_CONFIG=”buffer.memory”

超出任何一項就會丟擲異常。

6.  為record設定timestamp

如果使用者建立ProducerRecord時沒有指定timestamp,設定為producer的當前時間。

其實在java client中,設計了一個Time介面,專門用於設定這個時間的。內建了一個實現SystemTime,這裡將record timestamp設定為當前時間,就是由SystemTime來完成的。所以如果希望在kafka producer java client中使用其它的時間,可以自定義Time的實現。

這個時間戳有什麼要注意的,比如producer和kafkaserver在兩臺機器上,時間不同步,會對後續有什麼影響嗎?

7.  將該record壓縮後放到BufferPool中

這一步是由RecordAccumulator來完成的。RecordAccumulator中為每一個topic維護了一個雙端佇列Deque<RecordBatch>,佇列中的元素是RecordBatch(RecordBatch則由多個record壓縮而成)。RecordAccumulator要做的就是將record壓縮後放到與之topic關聯的那個Deque的最後面。

在將record放到Deque中最後一個RecordBatch中的邏輯為:如果最後一個recordbatch可以放的下就放,放不下就新建一個RecordBatch。

RecordBatch實際上是儲存於BufferPool中,所以這個過程實際上是把record放在BufferPool中。在建立BufferPool之初,會指定BufferPool的總大小,BufferPool中每一個RecordBatch的大小等等配置。

8.  喚醒傳送模組

執行到上一步時,KafkaProducer#sender的處理基本算是完畢。這個一步的目的就是喚醒NIO Selector。

此外,在上述步驟2~8,不論哪一步出現問題,都會丟擲異常。而丟擲異常時,就會被KafkaProducer捕獲到,然後交由Sensor(感測器)進行處理。而Sensor通常會呼叫第1步中提到的interceptorchain 執行onAcknowledgement告知使用者。

8.1.3 send函式的傳送排程處理

KafkaProducer#sender只是將record放到BufferPool中,並沒有將record發出去,而傳送排程,則是由另外一個執行緒(Sender)來完成的。

Sender的執行過程如下:

1.  取出就緒的record

這一步是檢查要傳送的record是否就緒:根據KafkaProducer維護的Metadata檢查要每一個record要發往的Leader node是否存在。如果有不存在的,就設定為需要更新,並且這樣的record認為還未就緒。以保證可以發到相關partition的leader node。

2.  取出RecordBatch,並過濾掉過期的RecordBatch

對於過期的RecordBatch,會通過Sensor通知Interceptor傳送失敗。

3.  為要傳送的RecordBatch建立請求

一個RecordBatch一個ClientRequest。

4.  保留請求併發送

把請求物件保留到一個inFlightRequest 集合中。這個集合中存放的是正在傳送的請求,是一個topic到Deque的Map。當傳送成功,或者失敗都會移除。

5.  處理髮送結果

如果傳送失敗,會嘗試retry。並由Sensor排程Interceptor。

如果傳送成功,會由Sensor排程Interceptor。

8.1.4 Producer實現總結

從上述處理流程中,可以看到在javaclient中的一些設計:

1.  InterceptorChain:可以做為用於自定義外掛的介面。

2.  MetaData:producer 不按需以及定期的傳送請求獲取最新的Cluster狀態資訊。Producer根據這個資訊可以直接將record batch傳送到相關partition的Leader中。也就是在客戶端完成Load balance。

3.  Partitioner:分割槽選擇工具,選擇傳送到哪些分割槽,結合Metadata,完成Load balance。

4.  RecordBatch:在客戶端對record壓縮排RecordBatch,然後一個RecordBatch發一次。這樣可以減少IO操作的次數,提高效能。

5.  非同步方式傳送:提高使用者應用效能。

8.1.5 Producer 配置說明

l  bootstrap.servers

用於配置cluster中borker的host/port對。可以配置一項或者多項,不需要將cluster中所有例項都配置上。因為它後自動發現所有的broker。

如果要配置多項,格式是:host1:port1,host2:port2,host3:port3….

l  key.serializer、value.serializer

配置序列化類名。指定 的這些類都要實現Serializer介面。

l  acks

為了確保message record被broker成功接收。Kafka Producer會要求Borker確認請求(傳送RecordBatch的請求)完成情況。

對於message接收情況的確認,Kafka Broker支援了三種情形:1、不需要確認;2)leader接收到就確認;3)等所有可用的follower複製完畢進行確認。可以看出,這三種情況代表不同的確認粒度。在JavaProducer Client中,對三種情形都做了支援,上述三種情形分別對應了三個配置項:0、1、-1。其實還有一個值是all,它其實就是-1。

Kafka Producer Java Client 是如何支援這三種確認:

1)  在為RecordBatch建立請求時,acks的值會被封裝為請求頭的一部分。

2)  傳送請求後(接收到Broker響應前),立即判斷是否需要確認該請求是否完成(即該RecordBatch是否被Broker成功接收),判斷依據是acks的值是否是0。如果是0,即不需要進行確認。那麼就認定該請求成功完成。既然認定是成功,那麼就不會進行retry了。

如果值不是0,就要等待Broker的響應了。根據響應情況,來判斷請求是否成功完成。

該配置項預設值是1,即leader接收後就響應。

l  buffer.memory

BufferPool Size,也就是等待發送的Record的空間大小。預設值是:33554432,即32MB。

配置項的單位是byte,範圍是:[0,….]

l  compression.type

Kafka提供了多種壓縮型別,可選值有4個: none, gzip, snappy, lz4。預設值是none。

l  retries

當一個RecordBatch傳送失敗時,就會重新改善以確保資料完成交付。該配置設定了重試次數,值範圍[0, Integer.Max]。如果是0,即便失敗,也不會進行重發。

如果允許重試(即retries>0),但max.in.flight.requests.per.connection 沒有設定成1。這種情況下,就可能會出現records的順序改變的現象。例如:一個prodcuder client的sender執行緒在一次輪詢中,如果有兩個recordbatch都要傳送到同步一個partition中,此時它們肯定是發往同一個broker的,並且是用的同一個TCP connection。如果出現RecordBatch1先發,但是傳送失敗,RecordBatch2緊接著RecordBatch1傳送,它是傳送成功的。然後RecordBatch1會進行重發。這樣一來,就出現了broker接收到的順序是RecordBatch2先於RecordBatch1的情況。

l  batch.size

RecordBatch的最大容量。預設值是16384(16KB)。

l  ssl.key.password

Keystore 檔案中私鑰的密碼。可選的。

l  ssl.keystore.location

Keystore檔案的位置。可選的。

l  ssl.keystore.password

Keystore 檔案的密碼。可選的。

l  ssl.truststore.location

Trust store 檔案的位置。可選的。

l  ssl.truststore.password

Trust store檔案的密碼。可選的。

l  client.id

邏輯名,client給broker發請求是會用到。預設值是:””。

l  connections.max.idle.ms

Connection的最大空閒時間。預設值是540000 (9 min)

l  linger.ms

Socket :solinger。延遲。預設值:0,即不延遲。

l  max.block.ms

當需要的metadata未到達之前(例如要傳送的record的topic,在Client中還沒有相關記錄時),執行KafkaProducer#send時,內部處理會等待MetaData的到達。這是個阻塞的操作。為了防止無限等待,設定這個阻塞時間是必要的。範圍:[0, Long.MAX]

l  max.request.size

最大請求長度,在將record壓縮到RecordBatch之前會進行校驗。超過這個大小會丟擲異常。

l  partitioner.class

用於自定義partitioner演算法。預設值是:

org.apache.kafka.clients.producer.internals.DefaultPartitioner

l  receive.buffer.byte

TCP receiver buffer的大小。取值範圍:[-1, …]。這個配置項的預設值是32768(即 32KB)。

如果設定為-1,則會採用作業系統的預設值。

l  request.timeout.ms

最大請求時長。因為發起請求後,會等待broker的響應,如果超過這個時間就認為請求失敗。

l  timeout.ms

這個時間配置的是follower到leader的ack超時時間。這個時間和producer傳送的請求的網路無關。

l  block.on.buffer.full

當bufferPool用完後,如果client還在使用KafkaProducer傳送record,要麼是BufferPool拒絕接收,要麼是丟擲異常。

這個配置是預設值是flase,也就是當bufferpool滿時,不會丟擲BufferExhaustException,而是根據max.block.ms進行阻塞,如果超時丟擲TimeoutExcpetion。

如果這個屬性值是true,則會把max.block.ms值設定為Long.MAX。另外該配置為true時,metadata.fetch.time.ms將不會生效了。

l  interceptor.class

自定義攔截器類。預設情況下沒有指定任何的interceptor。

l  max.in.flight.requests.per.connection

每個連線中處於傳送狀態的請求數的最大值。預設值是5。範圍是[1, Integer.MAX]

l  metric.reporters

MetricReporter的實現類。預設情況下,會自動的註冊JmxReporter。

l  metrics.num.samples

計算metric時的取樣數。預設值是2。範圍:[1,Integer.MAX]

l  metrics.sample.window.ms

取樣的時間視窗。預設值是30000(30s)。範圍:[0, Long.MAX]

如果我的kafka叢集已經開始運行了2個小時,有一個消費程式在持續的處理著訊息。

8.2       Consumer

相關推薦

Kafka 0.10 安裝使用

1    總體說明 筆記本:i5第六代,16G記憶體,256G固態硬碟 使用VirtualBox 5.0.22建立3臺虛擬機器。 2    主機規化 主機名 IP 用途 master 192.168.56.101 slave1 192.168.5

centos下安裝單機版kafka-0.10.0.1

1.環境說明 主機資訊如下: 1[[email protected] soft]# hostname2test13[[email protected] soft]# cat /etc/hosts4127.0.0.1   localh

Kafka-0.10.1叢集的安裝和配置

準備 1.kafka_2.10-0.10.1.1.tgz 2.安裝配置好的Zookeeper-3.4.10分散式叢集 mini1:192.168.213.133 mini2:192.168.213.134 mini3:192.168.213.135

win7+opencv3.0.0+vs2010 安裝配置

64位 結果 環境變量 space 防止 控制 屬性頁 屬性 鏈接器 最近看《學習opencv》,想要跑人臉識別的例子,於是先配環境吧。 1、 opencv下載: 具體下載地址,http://opencv.org/,官網太慢,百度網盤的資源鏈接:http://pan.ba

kafka集群安裝管理(一)

ini nohup class 技術分享 -o -h timeout lba 能夠 一、環境配置1.系統環境[root@date ~]# cat /etc/centos-release CentOS Linux release 7.4.1708 (Core)2..安裝JAV

kafka集群安裝管理(二)

進程 node 規則 nfa 情況 tor back per art 一、broker的遷移1.查看zookeeper和kafka啟動情況[root@slave1 ~]# pssh -h hostlist -i 'jps' [1] 22:08:11 [SUC

關於CDH5.11.0自帶kafka 0.10 bootstrap-server 無法消費

出現 指定 pre hit tst apache 10.2.2 來看 min 近日需要在項目用到kafka,然後本地使用cdh集成的kafka 進行安裝調試,以及些樣例代碼,sparkstreaming 相關調用kafka 的代碼使用的原始的api 而沒有走zook

新手小白Linux(Centos6.5)部署java web項目(mongodb4.0.2安裝相關操作)

read har space 創建 縮進 路徑 .org font url 紅帽企業或CentOS的Linux上安裝MongoDB的社區版: https://docs.mongodb.com/manual/tutorial/install-mongodb-on-red-ha

scala spark-streaming整合kafka (spark 2.3 kafka 0.10

obj required word 錯誤 prope apache rop sta move Maven組件如下: <dependency> <groupId>org.apache.spark</groupId> <

java8下spark-streaming結合kafka程式設計(spark 2.3 kafka 0.10

前面有說道spark-streaming的簡單demo,也有說到kafka成功跑通的例子,這裡就結合二者,也是常用的使用之一。 1.相關元件版本 首先確認版本,因為跟之前的版本有些不一樣,所以才有必要記錄下,另外仍然沒有使用scala,使用java8,spark 2.0.0,kafk

大資料學習之路97-kafka直連方式(spark streaming 整合kafka 0.10版本)

我們之前SparkStreaming整合Kafka的時候用的是傻瓜式的方式-----createStream,但是這種方式的效率很低。而且在kafka 0.10版本之後就不再提供了。 接下來我們使用Kafka直連的方式,這種方式其實是呼叫Kafka底層的消費資料的API,我們知道,越底層的東

Kafka 0.10.1.1 以時間戳查詢訊息和暫停某些分割槽消費和消費速度控制

轉自:https://www.jianshu.com/p/a4c1d281b66a 1. 以時間戳查詢訊息 (1) Kafka 新版消費者基於時間戳索引消費訊息 kafka 在 0.10.1.1 版本增加了時間索引檔案,因此我們可以根據時間戳來訪問訊息。 如以下需求:從半個小時之前的of

Kafka 0.10 常用運維命令

引言 Kafka是由LinkedIn開發的一個分散式的訊息系統,它以可水平擴充套件和高吞吐率而被廣泛使用,現在已經是Apache的專案。 Kafka系統自帶了豐富的運維管理工具,都是基於命令列的,本文主要介紹一些常用的命令。 讀者需要對Kafka已經有入門級的瞭解。 常用命令 以下命令都是在Kafk

【JMeter4.0安裝執行(windows環境)

安裝 JDK安裝及配置 安裝對應版本的java環境,配置好環境變數。 版本對應關係參考下表: JMeter版本 JDK版本 4.0 1.8 or 1.9 3.2/3.3 1.8+ 3.0/3.1 1.7+ JDK環境變數配置: “我的

Apache2.4.34 + php 7.28 + MySQL8.0.12 安裝配置

服務端的學習 Apache2.4.34 的安裝及配置 1.基本安裝 最新的 Apache 已經不提供 Windows 的安裝版本,所以我們這裡使用的是解壓版 安裝方式如下 1.注意:需要使用管理員身份執行命令列!!! 2. 切換到 Apache 解壓

kafka-0.10.0.1版本在java客戶端傳送資料接收不到

錯誤日誌: org.apache.kafka.common.errors.TimeoutException: Batch containing 100 record kafka版本:0.10.0.1 zookeeper版本: 3.4.5 產生的現象: 編寫好java

Apache Kafka 0.10.0.0&0.11.0.0新特性 更新日誌

一、About 0.10.0.0 Apache Kafka 0.10.0.0於美國時間2016年5月24日正式釋出。Apache Kafka 0.10.0.0是Apache Kafka的主要版本。以下是新特性: 1、Kafka Streams Kafk

Centos7 下 MongoDB4.0.0安裝複製集配置

#mongod.conf #for documentation of all options, see: #http://docs.mongodb.org/manual/reference/configuration-options/ systemLog:    destination: file    lo

kafka 0.10傳送、接收大訊息解決辦法

配置三個地方: Broker: message.max.bytes and replica.fetch.max.bytes Producer: max.request.sizeConsumer: max.partition.fetch.bytes注意: message.ma

java8下spark-streaming結合kafka程式設計(spark 2.0 & kafka 0.10

1.相關元件版本 首先確認版本,因為跟之前的版本有些不一樣,所以才有必要記錄下,另外仍然沒有使用scala,使用java8,spark 2.0.0,kafka 0.10。 2.引入maven包 網上找了一些結合的例子,但是跟我當前版本不一樣,所以根本就