訊息佇列kafka(一)--基本使用
一、kafka簡介
kafka是一種高吞吐量的訊息佇列。
二、kafka特點
1、輕量級,比如activeMQ等訊息佇列更輕量級。
2、訊息在kafka中,無論訊息是否被消費,都不會被刪除,會保留所有訊息。
3、訊息刪除的策略,基於時間。在config/server.properties中配置。即
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
4、採用scala語言編寫
三、kafka構成
1、topic
相當於佇列queue
2、broker
kafka叢集中的每一臺機器,即稱為broker。一個topic可以有多個broker。即是同個queue下的訊息可以分佈在多臺機器上。
3、partition
一個topic可以有多個partition,每一個partition對應一個檔案,每個檔案內包含資料及相應索引。
4、producer
訊息的生產者
5、consumer
訊息的消費者
6、consumer group
消費者組,對同一個topic,同一消費組內的消費者,只有一個能收到訊息。
而不同消費組的消費者,則都可以收到訊息,相當於廣播。
四、安裝及使用
1、下載kafka安裝包kafka_2.11-0.8.2.1.tgz
2、解壓到某個資料夾下: tar zxvf kafka_2.11-0.8.2.1.tgz
3、安裝zookeeper並啟動
4、進入解壓目錄下,開啟kafka服務: bin/kafka-server-start.sh config/server.properties
5、建立topic : bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my_topic_1
6、檢視建立的所有topic資訊:
命令: bin/kafka-topics.sh --describe --zookeeper localhost:2181
或者指定topic資訊: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my_topic_1
也可使用命令 孫悅/kafka-topics.sh --list --zookeeper localhost:2181
7、訊息生產者,傳送訊息到自定義的topic
命令:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic_1
接下來便可傳送訊息
8、訊息消費者,
命令:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my_topic_1 --from-beginning
五、使用java產生訊息
1、(特別注意)若將訊息傳送給遠端伺服器上,則需修改kafka安裝時的配置檔案kafka/config/server.properties
(在版本kafka_2.11-0.8.2.1中)將 #host.name=localhost 去掉註釋並改為本機的ip地址(不能使用localhost), 如 host.name=192.168.0.107
(在版本kafka_2.12-1.0.0中)將#listeners=PLAINTEXT://:9092 去掉註釋並改為本機的ip地址(不能使用localhost), 如 listeners = PLAINTEXT://192.168.0.107:9092
將#zookeeper.connect=localhost:2181 去掉註釋並改為本機的ip地址(不能使用localhost), 如 zookeeper.connect=192.168.0.107:2181
在,則需新增
2、新增maven依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.2.1</version> </dependency>
3、訊息產生的程式碼
public class KafkaSimpleProducerMain { public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.put("metadata.broker.list", "192.168.0.107:9092"); //指定kafka服務的地址和埠號 properties.put("key.serializer.class", "kafka.serializer.StringEncoder"); //指定key的序列化方式 properties.put("serializer.class", "kafka.serializer.StringEncoder"); //指定value的序列化方式 ProducerConfig producerConfig = new ProducerConfig(properties); Producer<String, String> producer = new Producer<>(producerConfig); String topic = "my_topic"; //指定topic while (true) { String msg = "this is a test msg" + new Date(); KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(topic, msg); producer.send(keyedMessage); //傳送訊息 System.out.println("send msg: " + msg); TimeUnit.SECONDS.sleep(1); } } }
輸出:
send msg: this is a test msgTue Oct 31 23:11:21 CST 2017
send msg: this is a test msgTue Oct 31 23:11:23 CST 2017
send msg: this is a test msgTue Oct 31 23:11:24 CST 2017
send msg: this is a test msgTue Oct 31 23:11:25 CST 2017
send msg: this is a test msgTue Oct 31 23:11:26 CST 2017
send msg: this is a test msgTue Oct 31 23:11:27 CST 2017
六、使用java消費訊息
1、新增mavan依賴,同上
2、訊息消費的程式碼
public class KafkaSimpleConsumerMain { public static void main(String[] args) { Properties properties = new Properties(); properties.put("zookeeper.connect","192.168.0.107:2181"); //指定zookeeper properties.put("zookeeper.session.timeout.ms", "300000"); //設定超時時間 properties.put("serializer.class", "kafka.serializer.StringEncoder"); //指定value的序列化方式 properties.put("group.id","my_group_1"); ConsumerConfig consumerConfig = new ConsumerConfig(properties); String topic = "my_topic"; ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); Map<String,Integer> topicConutMap = new HashMap<>(); topicConutMap.put(topic,1); //指定每次取數個數 Map<String, List<KafkaStream<byte[],byte[]>>> messageStreams = consumer.createMessageStreams(topicConutMap); KafkaStream<byte[],byte[]> stream = messageStreams.get(topic).get(0); ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); while(iterator.hasNext()){ String msg = new String(iterator.next().message()); System.out.println("receive msg : "+msg); } } }輸出:
receive msg : this is a test msgTue Oct 31 23:11:21 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:23 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:24 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:25 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:26 CST 2017
receive msg : this is a test msgTue Oct 31 23:11:27 CST 2017