kafka環境搭建(Windows/Linux)
(一)安裝zookeeper(windows)
kafka需要用到zookeeper,所以需要先安裝zookeeper
1.到官網下載最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/
2.解壓到你喜歡的路徑,我這裡為:E:\zookeeper\zookeeper-3.4.10
3.複製conf目錄下zoo_sample.cfg,貼上改名為zoo.cfg,修改zoo.cfg中的dataDir的值為E:/data/zookeeper,並新增一行dataLogDir=E:/log/zookeeper
4.修改系統環境變數,在Path後新增 ;E:\zookeeper\zookeeper-3.4.10\bin
5.執行cmd命令視窗,輸入zkServer回車,出現下圖的就表示zookeeper啟動成功,也表明安裝成功了。
安裝zookeeper(Linux)
1. Xshell等工具連線Linux伺服器,切換到任意目錄,下載zookeeper最新穩定版,下載地址http://mirrors.hust.edu.cn/apache/zookeeper/stable/,命令如下
cd /usr/soft
wget http://mirrors.hust.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz
2.解壓
tar -xzvf zookeeper-3.4.10.tar.gz
3.切換到conf配置檔案目錄,複製zoo_sample.cfg為zoo.cfg可以按需修改配置檔案內容
4.切換到bin目錄,啟動zookeeper,看到Starting zookeeper ... STARTED字樣表示啟動成功了
./zkServer.sh start
(二)安裝kafka(windows)
1. 到官網下載最新版kafka,http://kafka.apache.org/downloads
2.解壓到你喜歡的路徑,我這裡解壓路徑為:E:\kafka_2.12-0.10.2.0
3.修改E:\kafka_2.12-0.10.2.0\config目錄下的server.properties中 log.dirs的值為E:/log/kafka
4.新增系統環境變數,在Path後新增 ;E:\kafka_2.12-0.10.2.0\bin\windows
5.啟動kafka,在cmd命令列用cd命令切換到kafka根目錄E:\kafka_2.12-0.10.2.0,輸入命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
出現started (kafka.server.KafkaServer)字樣表示啟動成功
啟動時若出現“wvim不是內部或外部命令...”錯誤提示,則需要在系統Path環境變數後新增
;C:\Windows\System32\wbem
6.執行cmd命令列,建立一個topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
7.再開啟一個cmd,建立一個Producer
kafka-console-producer.bat --broker-list localhost:9092 --topic test
8.再開啟一個cmd,建立一個Customer
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
9.在Producer視窗下輸入資訊進行測試 ,每輸入一行回車後訊息馬上就會出現在Customer中,表明kafka已經安裝測試成功
安裝kafka(Linux)
1.下載kafka最新版https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
2.解壓,資料夾重新命名
tar -xzvf
kafka_2.12-0.10.2.0.tgz
mv kafka_2.12-0.10.2.0 kafka
3.切換目錄到kafka目錄下的bin目錄,用vi命令修改kafka-server-start.sh中jvm記憶體大小,把
export KAFKA_HEAP_OPTS="-Xms1G -Xms1G" 修改為
export KAFKA_HEAP_OPTS="-Xms256M -Xms128M",當然如果你的記憶體夠大可以不修改
4.切換到kafka根目錄,啟動kafka,啟動成功如下圖
bin/kafka-server-start.sh config/server.properties
5.建立topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
建立一個名為test的topic,只有一個副本,一個分割槽。
通過list命令檢視剛剛建立的topic
bin/kafka-topics.sh -list -zookeeper 127.0.0.1:2181
6.啟動producer併發送訊息啟動producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
啟動之後就可以傳送訊息了
按Ctrl+C退出傳送訊息
7.啟動consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
啟動consumer之後就可以在console中看到producer傳送的訊息了
可以開啟兩個終端,一個傳送訊息,一個接受訊息。
(三)kafka程式設計之Java介面
1.新建Maven工程,我這裡用的是Eclipse;pom加入kafka依賴,如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
2.新建生產測試類TestProducer.java
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class TestProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
//“所有”設定將導致記錄的完整提交阻塞,最慢的,但最持久的設定。
props.put("acks", "all");
//如果請求失敗,生產者也會自動重試,即使設定成0 the producer can automatically retry.
props.put("retries", 0);
//The producer maintains buffers of unsent records for each partition.
props.put("batch.size", 16384);
//預設立即傳送,這裡這是延時毫秒數
props.put("linger.ms", 1);
//生產者緩衝大小,當緩衝區耗盡後,額外的傳送呼叫將被阻塞。時間超過max.block.ms將丟擲TimeoutException
props.put("buffer.memory", 33554432);
//The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//建立kafka的生產者類
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//生產者的主要方法
// close();//Close this producer.
// close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
// flush() ;所有快取記錄被立刻傳送
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("test",0, Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
3.新建消費測試類TestCustomer.java
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class TestConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
System.out.println("this is the group part test 1");
//消費者的組id
props.put("group.id", "GroupA");//這裡是GroupA或者GroupB
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
//從poll(拉)的回話處理時長
props.put("session.timeout.ms", "30000");
//poll的數量限制
//props.put("max.poll.records", "100");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//訂閱主題列表topic
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
// 正常這裡應該使用執行緒池處理,不應該在這裡處理
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n");
}
}
}
4.先執行(run/debug)TestCustomer,再執行TestProducer,在TestCustomer的控制檯看到下圖的結果就表示訊息傳送並接收成功了
並且在之前啟動的消費端的命令視窗也能看到接收到的資料:
dazu表示kakazhj