1. 程式人生 > >kafka環境搭建(Windows/Linux)

kafka環境搭建(Windows/Linux)

(一)安裝zookeeper(windows)

kafka需要用到zookeeper,所以需要先安裝zookeeper

1.到官網下載最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/

2.解壓到你喜歡的路徑,我這裡為:E:\zookeeper\zookeeper-3.4.10

3.複製conf目錄下zoo_sample.cfg,貼上改名為zoo.cfg,修改zoo.cfg中的dataDir的值為E:/data/zookeeper,並新增一行dataLogDir=E:/log/zookeeper

4.修改系統環境變數,在Path後新增    ;E:\zookeeper\zookeeper-3.4.10\bin

5.執行cmd命令視窗,輸入zkServer回車,出現下圖的就表示zookeeper啟動成功,也表明安裝成功了。


安裝zookeeper(Linux)

1. Xshell等工具連線Linux伺服器,切換到任意目錄,下載zookeeper最新穩定版,下載地址http://mirrors.hust.edu.cn/apache/zookeeper/stable/,命令如下

cd /usr/soft

wget http://mirrors.hust.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz

2.解壓

tar -xzvf zookeeper-3.4.10.tar.gz

3.切換到conf配置檔案目錄,複製zoo_sample.cfg為zoo.cfg可以按需修改配置檔案內容

4.切換到bin目錄,啟動zookeeper,看到Starting zookeeper ... STARTED字樣表示啟動成功了

./zkServer.sh start

(二)安裝kafka(windows)

1. 到官網下載最新版kafka,http://kafka.apache.org/downloads

2.解壓到你喜歡的路徑,我這裡解壓路徑為:E:\kafka_2.12-0.10.2.0

3.修改E:\kafka_2.12-0.10.2.0\config目錄下的server.properties中 log.dirs的值為E:/log/kafka

4.新增系統環境變數,在Path後新增   ;E:\kafka_2.12-0.10.2.0\bin\windows

5.啟動kafka,在cmd命令列用cd命令切換到kafka根目錄E:\kafka_2.12-0.10.2.0,輸入命令

 .\bin\windows\kafka-server-start.bat .\config\server.properties

出現started (kafka.server.KafkaServer)字樣表示啟動成功

啟動時若出現“wvim不是內部或外部命令...”錯誤提示,則需要在系統Path環境變數後新增
;C:\Windows\System32\wbem
6.執行cmd命令列,建立一個topic

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

7.再開啟一個cmd,建立一個Producer

kafka-console-producer.bat --broker-list localhost:9092 --topic test

8.再開啟一個cmd,建立一個Customer

kafka-console-consumer.bat --zookeeper localhost:2181 --topic test

9.在Producer視窗下輸入資訊進行測試 ,每輸入一行回車後訊息馬上就會出現在Customer中,表明kafka已經安裝測試成功


安裝kafka(Linux)

1.下載kafka最新版https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz

2.解壓,資料夾重新命名

tar -xzvf kafka_2.12-0.10.2.0.tgz

mv kafka_2.12-0.10.2.0 kafka

3.切換目錄到kafka目錄下的bin目錄,用vi命令修改kafka-server-start.sh中jvm記憶體大小,把

export KAFKA_HEAP_OPTS="-Xms1G -Xms1G" 修改為
export KAFKA_HEAP_OPTS="-Xms256M -Xms128M",當然如果你的記憶體夠大可以不修改


4.切換到kafka根目錄,啟動kafka,啟動成功如下圖

bin/kafka-server-start.sh config/server.properties


5.建立topic
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
建立一個名為test的topic,只有一個副本,一個分割槽。
通過list命令檢視剛剛建立的topic
bin/kafka-topics.sh -list -zookeeper 127.0.0.1:2181

6.啟動producer併發送訊息啟動producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
啟動之後就可以傳送訊息了

按Ctrl+C退出傳送訊息

7.啟動consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
啟動consumer之後就可以在console中看到producer傳送的訊息了
可以開啟兩個終端,一個傳送訊息,一個接受訊息。

(三)kafka程式設計之Java介面

1.新建Maven工程,我這裡用的是Eclipse;pom加入kafka依賴,如下:

<dependency>
    	<groupId>org.apache.kafka</groupId>
    	<artifactId>kafka_2.11</artifactId>
    	<version>0.10.2.0</version>
</dependency>

2.新建生產測試類TestProducer.java
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class TestProducer {
    public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         //The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting.
        //“所有”設定將導致記錄的完整提交阻塞,最慢的,但最持久的設定。
         props.put("acks", "all");
         //如果請求失敗,生產者也會自動重試,即使設定成0 the producer can automatically retry.
         props.put("retries", 0);
         //The producer maintains buffers of unsent records for each partition. 
         props.put("batch.size", 16384);
         //預設立即傳送,這裡這是延時毫秒數
         props.put("linger.ms", 1);
         //生產者緩衝大小,當緩衝區耗盡後,額外的傳送呼叫將被阻塞。時間超過max.block.ms將丟擲TimeoutException
         props.put("buffer.memory", 33554432);
         //The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

         //建立kafka的生產者類
         Producer<String, String> producer = new KafkaProducer<String, String>(props);
         //生產者的主要方法
         // close();//Close this producer.
         //   close(long timeout, TimeUnit timeUnit); //This method waits up to timeout for the producer to complete the sending of all incomplete requests.
         //  flush() ;所有快取記錄被立刻傳送
         for(int i = 0; i < 100; i++)
             producer.send(new ProducerRecord<String, String>("test",0, Integer.toString(i), Integer.toString(i)));
             producer.close();
    }
}


3.新建消費測試類TestCustomer.java
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class TestConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        System.out.println("this is the group part test 1");
        //消費者的組id
        props.put("group.id", "GroupA");//這裡是GroupA或者GroupB
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        //從poll(拉)的回話處理時長
        props.put("session.timeout.ms", "30000");
        //poll的數量限制
        //props.put("max.poll.records", "100");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //訂閱主題列表topic
        consumer.subscribe(Arrays.asList("test"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                // 正常這裡應該使用執行緒池處理,不應該在這裡處理
                System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n");

        }
    }

}

4.先執行(run/debug)TestCustomer再執行TestProducer,在TestCustomer的控制檯看到下圖的結果就表示訊息傳送並接收成功了



並且在之前啟動的消費端的命令視窗也能看到接收到的資料:



dazu表示kakazhj