1. 程式人生 > >訊息佇列kafka(一)--基本使用

訊息佇列kafka(一)--基本使用

一、kafka簡介

kafka是一種高吞吐量的訊息佇列。

二、kafka特點

1、輕量級,比如activeMQ等訊息佇列更輕量級。

2、訊息在kafka中,無論訊息是否被消費,都不會被刪除,會保留所有訊息。

3、訊息刪除的策略,基於時間。在config/server.properties中配置。即

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

4、採用scala語言編寫

三、kafka構成

1、topic

相當於佇列queue

2、broker

kafka叢集中的每一臺機器,即稱為broker。一個topic可以有多個broker。即是同個queue下的訊息可以分佈在多臺機器上。

3、partition

一個topic可以有多個partition,每一個partition對應一個檔案,每個檔案內包含資料及相應索引。

4、producer

訊息的生產者

5、consumer

訊息的消費者

6、consumer group

消費者組,對同一個topic,同一消費組內的消費者,只有一個能收到訊息。

而不同消費組的消費者,則都可以收到訊息,相當於廣播。

四、安裝及使用

1、下載kafka安裝包kafka_2.11-0.8.2.1.tgz

2、解壓到某個資料夾下:   tar zxvf kafka_2.11-0.8.2.1.tgz

3、安裝zookeeper並啟動

4、進入解壓目錄下,開啟kafka服務:   bin/kafka-server-start.sh  config/server.properties

5、建立topic :   bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic_1

6、檢視建立的所有topic資訊:

命令:  bin/kafka-topics.sh --describe --zookeeper localhost:2181

或者指定topic資訊:  bin/kafka-topics.sh --describe --zookeeper localhost:2181  --topic my_topic_1

也可使用命令 孫悅/kafka-topics.sh --list --zookeeper localhost:2181

7、訊息生產者,傳送訊息到自定義的topic  

命令:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_1

接下來便可傳送訊息

8、訊息消費者,

命令:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic_1 --from-beginning

五、使用java產生訊息

1、(特別注意)若將訊息傳送給遠端伺服器上,則需修改kafka安裝時的配置檔案kafka/config/server.properties

(在版本kafka_2.11-0.8.2.1中)將 #host.name=localhost 去掉註釋並改為本機的ip地址(不能使用localhost), 如 host.name=192.168.0.107

(在版本kafka_2.12-1.0.0中)將#listeners=PLAINTEXT://:9092 去掉註釋並改為本機的ip地址(不能使用localhost), 如 listeners = PLAINTEXT://192.168.0.107:9092

將#zookeeper.connect=localhost:2181  去掉註釋並改為本機的ip地址(不能使用localhost), 如 zookeeper.connect=192.168.0.107:2181

在,則需新增

2、新增maven依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.9.2</artifactId>
    <version>0.8.2.1</version>
</dependency>

3、訊息產生的程式碼

public class KafkaSimpleProducerMain {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
properties.put("metadata.broker.list", "192.168.0.107:9092");  //指定kafka服務的地址和埠號
properties.put("key.serializer.class", "kafka.serializer.StringEncoder"); //指定key的序列化方式
properties.put("serializer.class", "kafka.serializer.StringEncoder");  //指定value的序列化方式
ProducerConfig producerConfig = new ProducerConfig(properties);
Producer<String, String> producer = new Producer<>(producerConfig);
String topic = "my_topic"; //指定topic
while (true) {
            String msg = "this is a test msg" + new Date();
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(topic, msg);
producer.send(keyedMessage);  //傳送訊息
System.out.println("send msg: " + msg);
TimeUnit.SECONDS.sleep(1);
}
    }
}

輸出:

send msg: this is a test msgTue Oct 31 23:11:21 CST 2017
send msg: this is a test msgTue Oct 31 23:11:23 CST 2017
send msg: this is a test msgTue Oct 31 23:11:24 CST 2017
send msg: this is a test msgTue Oct 31 23:11:25 CST 2017
send msg: this is a test msgTue Oct 31 23:11:26 CST 2017
send msg: this is a test msgTue Oct 31 23:11:27 CST 2017

六、使用java消費訊息

1、新增mavan依賴,同上

2、訊息消費的程式碼

public class KafkaSimpleConsumerMain {
    public static void main(String[] args) {
        Properties properties = new Properties();
properties.put("zookeeper.connect","192.168.0.107:2181");  //指定zookeeper
properties.put("zookeeper.session.timeout.ms", "300000");  //設定超時時間
properties.put("serializer.class", "kafka.serializer.StringEncoder");  //指定value的序列化方式
properties.put("group.id","my_group_1");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
String topic = "my_topic";
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String,Integer> topicConutMap = new HashMap<>();
topicConutMap.put(topic,1);  //指定每次取數個數
Map<String, List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicConutMap);
KafkaStream<byte[],byte[]> stream = messageStreams.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
        while(iterator.hasNext()){
            String msg = new String(iterator.next().message());
System.out.println("receive msg : "+msg);
}
    }
}
輸出:

receive msg : this is a test msgTue Oct 31 23:11:21 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:23 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:24 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:25 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:26 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:27 CST 2017