[Kafka] Apache Kafka 簡介、叢集搭建及配置詳解
阿新 • • 發佈:2018-12-18
前言
kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。這些資料通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。
Kafka中相關名詞
- producer:生產者。
- consumer:消費者。
- topic: 訊息以topic為類別記錄,Kafka將訊息種子(Feed)分門別類,每一類的訊息稱之為一個主題(Topic)。
- broker:以叢集的方式執行,可以由一個或多個服務組成,每個服務叫做一個broker;消費者可以訂閱一個或多個主題(topic),並從Broker拉資料,從而消費這些已釋出的訊息。每個訊息(也叫作record記錄,也被稱為訊息)是由一個key,一個value和時間戳構成。
Kafka的應用場景
- 構建實時的流資料管道,可靠地獲取系統和應用程式之間的資料。
- 構建實時流的應用程式,對資料流進行轉換或反應。
Kafka叢集部署
mkdir -p /usr/kafka (多臺機器同步建立)
tar -xzvf kafka_2.11-0.11.0.0.tgz -C /usr/kafka/
建立日誌存放目錄
mkdir -p /usr/data/kafka/logs (多臺機器同步建立)
修改配置檔案 (server.properties)
# 進入配置檔案目錄
cd /usr/kafka/kafka_2.11-0.11.0.0/config
# 檢視配置資訊 (自己可以在這配置基礎上進行配置修改,複製出來修改即可)
cat server.properties | grep -v "#"
# 將配置檔案移除並備份
mv server.properties server.properties.bak
# 配置新的配置資訊
vim server.properties
填入以下內容
# 需要修改叢集之間的brokerId broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # 修改日誌目錄 log.dirs=/usr/data/kafka/logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 # 修改ZK的連線地址 zookeeper.connect=node1:2181,node2:2181,node3:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 # 需要修改 hostname 每個機器上不一樣 delete.topic.enable=true host.name=node1
分發
scp -r /usr/kafka/kafka_2.11-0.11.0.0/ [email protected]:/usr/kafka/
scp -r /usr/kafka/kafka_2.11-0.11.0.0/ [email protected]:/usr/kafka/
修改node2與node3的配置檔案 (server.properties)
# node2
broker.id=1
host.name=node2
# node3
broker.id=2
host.name=node3
啟動方式一 :
cd /usr/kafka/kafka_2.11-0.11.0.0/bin
./kafka-server-start.sh /usr/kafka/kafka_2.11-0.11.0.0/config/server.properties
啟動方式二 : 後臺啟動
# 注 : 這裡是一行命令
nohup /usr/kafka/kafka_2.11-0.11.0.0/bin/kafka-server-start.sh
/usr/kafka/kafka_2.11-0.11.0.0/config/server.properties >/dev/null 2>&1 &
Kafka 命令列的使用
建立topic
./kafka-topics.sh --create --zookeeper node1:2181 --topic test --partitions 1 --replication-factor 3
- topic 主題的名字
- zookeeper zookeeper地址
- partitions 副本,副本的作用是備份
- replication-factor 分片,分片的作用是分治
顯示所有topic
./kafka-topics.sh --list --zookeeper node01:2181
檢視topic
./kafka-topics.sh --describe --topic test --zookeeper node1:2181
刪除topic
./kafka-topics.sh --delete --topic test --zookeeper node1:2181
檢視offset的最大值
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node1:9092 -topic test --time -1
檢視offset的最小值
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node1:9092 -topic test --time -2
生產資料
# 單機啟動
./kafka-console-producer.sh --topic order --broker-list node1:9092
# 叢集啟動
./kafka-console-producer.sh --topic order --broker-list node1:9092,node2:9092,node3:9092
消費資料
# 單機啟動
./kafka-console-consumer.sh --topic order --bootstrap-server node1:9092
# 叢集啟動
./kafka-console-consumer.sh --topic order --bootstrap-server node1:9092,node2:9092,node3:9092
# 從最原始偏移量消費訊息
./kafka-console-consumer.sh --topic order --bootstrap-server node1:9092 --from-beginning
Java 操作
引入pom依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
生產者
ProducerPro.java 配置檔案
public class ProducerPro {
public static Properties props = new Properties();
static {
//指定kafka的broker地址
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
//訊息確認機制 有4個值(0,1,-1,all)
props.put("acks", "all");
//重試次數
props.put("retries", 0);
//預設的批量處理訊息位元組數
props.put("batch.size", 16384);
//訊息位元組數比batch.size要小的多,我們需要linger特定的時間以獲取更多的訊息
//這個設定預設為0,即沒有延遲。設定linger.ms=1
//例如,將會減少請求數目,但是同時會增加1ms的延遲。
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
/**
* 序列化的作用:為了資料的儲存和傳輸。
* product:name=zhangsan,age=10,sex=0
* 序列化:將物件儲存到本地,product---> name=zhangsan,age=10,sex=0
* 發序列化:name=zhangsan,age=10,sex=0 -->product
* k:name v:zhangsan
* k:age v:10
* k:sex v:0
* Class.forname()
*/
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
}
MyProducer.java
public class MyProducer {
public static void main(String[] args) {
//主題
String topic = "test";
//推送的資料
String value = "msg";
KafkaProducer<String, String> kafkaProducer =
new KafkaProducer<String, String>(ProducerPro.props);
ProducerRecord record = new ProducerRecord(topic,value);
kafkaProducer.send(record);
kafkaProducer.flush();
kafkaProducer.close();
}
}
消費者
ConsumerPro.java 配置檔案
public class ConsumerPro {
public static Properties props = new Properties();
static {
//指定kafka叢集地址
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
//指定消費者組id
props.put("group.id", "group1");
//是否自動提交offset
// 0.8以前,儲存在zookeeper,0.8+以後的版本,儲存到topic下,這個topic叫做 __consumer_offsets
props.put("enable.auto.commit", "true");
//自動確認offset的時間間隔
props.put("auto.commit.interval.ms", "1000");
// key的序列化類 持久化(網路、磁碟),反序列化就是講磁碟或者網路中的資料還原物件。
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// value的序列化類
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
}
}
MyConsumer.java
public class MyConsumer {
public static void main(String[] args) {
//訂閱主題 (這裡可以訂閱多個主題,可以同時接收多個主題訊息)
List<String> topics = new ArrayList();
topics.add("test");
topics.add("test1");
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer(ConsumerPro.props);
kafkaConsumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> poll = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record : poll) {
String value = record.value();
System.out.println(value);
}
}
}
}
自定義Partition
MyPartitioner.java
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key,
byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int numPartition = cluster.availablePartitionsForTopic(topic).size();
if (keyBytes==null){
String uuid = UUID.randomUUID().toString();
int index = Math.abs(uuid.hashCode() % numPartition);
return index;
}else{
int index = Math.abs(keyBytes.hashCode() % numPartition);
return index;
}
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> map) {}
}
修改配置檔案
props.put("partition.class","com.kafka.test.demo.MyPartitioner");
配置檔案
Server.properties
#broker的全域性唯一編號,不能重複
broker.id=0
#用來監聽連結的埠,producer或consumer將在此埠建立連線
port=9092
#處理網路請求的執行緒數量
num.network.threads=3
#用來處理磁碟IO的執行緒數量
num.io.threads=8
#傳送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接受套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩衝區大小
socket.request.max.bytes=104857600
#kafka執行日誌存放的路徑
log.dirs=/export/servers/kafka/kafka-logs
#topic在當前broker上的分片個數
num.partitions=2
#用來恢復和清理data下資料的執行緒數量
num.recovery.threads.per.data.dir=1
#segment檔案保留的最長時間,超時將被刪除(預設保留168小時=7天)
log.retention.hours=168
#一個topic的資料量大小達到一定閥值時,會刪除topic的資料,預設等於-1,表示沒有限制
log.retention.bytes=-1
#滾動生成新的segment檔案的最大時間
log.roll.hours=1
#日誌檔案中每個segment的大小,預設為1G
log.segment.bytes=1073741824
#週期性檢查檔案大小的時間
log.retention.check.interval.ms=300000
#日誌清理是否開啟
log.cleaner.enable=true
#broker需要使用zookeeper儲存meta資料
zookeeper.connect=node1:2181,node2:2181,node3:2181
#zookeeper連結超時時間
zookeeper.connection.timeout.ms=6000
#partion buffer中,訊息的條數達到閾值,將觸發flush到磁碟
log.flush.interval.messages=10000
#訊息buffer的時間,達到閾值,將觸發flush到磁碟
log.flush.interval.ms=3000
#刪除topic需要server.properties中設定delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true
#此處的host.name為本機IP(重要),如果不改,則客戶端會丟擲:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=node1
#廣播地址,主要用於外網連線kafka叢集,一般用不到
advertised.host.name=192.168.200.100
Producer.properties
#指定kafka節點列表,用於獲取metadata,不必全部指定(老版本這樣指定)
metadata.broker.list=node1:9092,node2:9092,node3:9092
# 指定分割槽處理類。預設kafka.producer.DefaultPartitioner,表通過key雜湊到對應分割槽
partitioner.class=kafka.producer.DefaultPartitioner
# 是否壓縮,預設0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮後訊息中會有頭來指明訊息壓縮型別,故在消費者端訊息解壓是透明的無需指定。
gzip snappy
compression.codec=none
# 指定序列化處理類
serializer.class=kafka.serializer.DefaultEncoder
# 如果要壓縮訊息,這裡指定哪些topic要壓縮訊息,預設empty,表示不壓縮。
#compressed.topics=
# 設定傳送資料是否需要服務端的反饋,有三個值0,1,-1
# 0: producer不會等待broker傳送ack
# 1: 當leader接收到訊息之後傳送ack
# -1: 當所有的follower都同步訊息成功後傳送ack.
request.required.acks=0
# 在向producer傳送ack之前,broker允許等待的最大時間 ,如果超時,broker將會向producer傳送一個error ACK.意味著上一次訊息因為某種原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000
# 同步還是非同步傳送訊息,預設“sync”表同步,"async"表非同步。非同步可以提高發送吞吐量,
也意味著訊息將會在本地buffer中,並適時批量傳送,但是也可能導致丟失未傳送過去的訊息
producer.type=sync
# 在async模式下,當message被快取的時間超過此值後,將會批量傳送給broker,預設為5000ms
# 此值和batch.num.messages協同工作.
queue.buffering.max.ms = 5000
# 在async模式下,producer端允許buffer的最大訊息量
# 無論如何,producer都無法儘快的將訊息傳送給broker,從而導致訊息在producer端大量沉積
# 此時,如果訊息的條數達到閥值,將會導致producer端阻塞或者訊息被拋棄,預設為10000
queue.buffering.max.messages=20000
# 如果是非同步,指定每次批量傳送資料量,預設為200
batch.num.messages=500
# 當訊息在producer端沉積的條數達到"queue.buffering.max.meesages"後
# 阻塞一定時間後,佇列仍然沒有enqueue(producer仍然沒有傳送出任何訊息)
# 此時producer可以繼續阻塞或者將訊息拋棄,此timeout值用於控制"阻塞"的時間
# -1: 無阻塞超時限制,訊息不會被拋棄
# 0:立即清空佇列,訊息被拋棄
queue.enqueue.timeout.ms=-1
# 當producer接收到error ACK,或者沒有接收到ACK時,允許訊息重發的次數,因為broker並沒有完整的機制來避免訊息重複,所以當網路異常時(比如ACK丟失)
# 有可能導致broker接收到重複的訊息,預設值為3.
message.send.max.retries=3
# producer重新整理topic metada的時間間隔,producer需要知道partition leader的位置,以及當前topic的情況,因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立即重新整理 。 (比如topic失效,partition丟失,leader失效等),此外也可以通過此引數來配置額外的重新整理機制,預設值600000
topic.metadata.refresh.interval.ms=60000
Consumer.properties
# zookeeper連線伺服器地址
zookeeper.connect=node1:2181,node2:2181,node3:2181
# zookeeper的session過期時間,預設5000ms,用於檢測消費者是否掛掉
zookeeper.session.timeout.ms=5000
#當消費者掛掉,其他消費者要等該指定時間才能檢查到並且觸發重新負載均衡
zookeeper.connection.timeout.ms=10000
#ZooKeeper叢集中leader和follower之間的同步時間
zookeeper.sync.time.ms=2000
#指定 消費者組id
group.id=itcast
# 當consumer消費一定量的訊息之後,將會自動向zookeeper提交offset資訊
# 注意offset資訊並不是每消費一次訊息就向zk提交一次,而是現在本地儲存(記憶體),並定期提交,預設為true
auto.commit.enable=true
# 自動更新時間。預設60 * 1000
auto.commit.interval.ms=1000
# 當前consumer的標識,可以設定,也可以有系統生成,主要用來跟蹤訊息消費情況,便於觀察
conusmer.id=xxx
# 消費者客戶端編號,用於區分不同客戶端,預設客戶端程式自動產生
client.id=xxxx
# 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新的consumer上,如果一個consumer獲得了某個partition的消費許可權,那麼它將會向zk註冊 "Partition Owner registry"節點資訊,但是有可能此時舊的consumer尚沒有釋放此節點, 此值用於控制,註冊節點的重試次數.
rebalance.max.retries=5
# 傳送到消費端的最小資料,若是不滿足這個數值則會等待直到滿足指定大小。預設為1表示立即接收
fetch.min.bytes=1
# 當訊息的尺寸不足時,server阻塞的時間,如果超時,訊息將立即傳送給consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。
# earliest:當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
# latest當各分割槽下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割槽下的資料
# none:topic各分割槽都存在已提交的offset時,從offset後開始消費;只要有一個分割槽不存在已提交的offset,則丟擲異常
auto.offset.reset=earliest
# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder