Kafka介紹和使用
什麼是Kafka?
分散式、流資料平臺,類似訊息佇列
流資料平臺可以:1、釋出和訂閱流資料
2、容錯的儲存流資料
3、處理流資料
Kafka通常用來: 1、作為實時流資料通道,應用或系統間傳輸資料(實時性,低延遲,可
用於暫存重要資料)
2、做實時流資料處理、響應
Kafka通常用來做告訴輸出的快取,這是由於它變態的效能。另外它可以暫存重要資料,因為從Kafka消費(讀取)資料後資料是不會立即刪除的,而且這些資料可以設定備份。
Kafka的API
Producer:用於向Kafka生產資料,單個producer可以對應多topics
Consumer:用於從Kafka消費資料,單個consumer可以對應多topics
Streams:用於做簡單的流資料處理,可生產可消費,多topics
Connector:用於建立執行多個producer和consumer
多語言:Java等主流語言都有對應的API
Topic:records的分類,所有record都是釋出到某一個topic的
Record:{key,value,timestamp}
可以把Kafka理解為不可修改的queue(of records),儲存一段歷史時間的資料eg.兩天
每個consumer元資料只儲存一個offset,可由consumer自由控制,也就是說offset不一定是簡單遞增的,如果有這個需要,可以返回讀取兩天前的資料,或者直接讀取最新的資料
Partition 一個topic被分為多個partition,可存在不同機器上,record被分配到不同partition(可根據key寫分割槽函式),減少單個節點負載
Replica 每個partition有多個複製,可存在不同機器上,由一個leader和零或多個follower組成,只有leader可讀可寫,follower只寫,leader掛掉後從follower中選出新的leader
保證:每個partition內按輸入順序存放
其他概念:
Consumer group:同一個topic的不同partition平均動態分配到一個consumer group的所有consumer上,同一個topic可以broadcast到每一個consumer group(subscriber),每一個topic被分解傳輸到group裡的不同consumer,如果同一個group中的消費者數量多於本topic的partition數量,多餘的消費者將接收不到訊息
Broker:Kafka叢集中的機器/服務被成為broker, 是一個物理概念。
ISR:in-syncreplicas,存活的replica,如果一個topic的partitions有三個複製,存在0,1,2號機器上,那麼他的ISR就是0,1,2
Shrink:如果一個broker掛掉了,ISR會收縮(shrink),重新起來後ISR會擴大(expand)
HW(highwatermark高水位線):指partition的ISR中所對應的log的LEO(LogEndOffset)中最小的那個值
Commit(提交):指consumer讀取後更新當前分割槽offset,有多種處理提交的方法
Kafka效能:
Producer Throughput
Single producer thread, no replication
821,557 records/sec
(78.3 MB/sec)
Consumer Throughput
Single Consumer
940,521 records/sec
(89.7 MB/sec)
這只是單個producer和consumer,單個執行緒,多個執行緒、或者多個producer或consumer,這個資料還要恐怖。
詳見
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
安裝和簡單命令列producer、consumer
(來自http://kafka.apache.org/quickstart)
下載解壓:
$tar -xzf kafka_2.11-1.1.0.tgz
$cd kafka_2.11-1.1.0
啟動Kafka自帶單機版zookeeper
$bin/zookeeper-server-start.sh config/zookeeper.properties
啟動Kafka
$bin/kafka-server-start.sh config/server.properties
建立topic
$bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
檢視topic資訊
$bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
建立producer傳送資料(命令列)
$bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>This is amessage
>This is anothermessage
>
建立consumer接收資料(命令列)
$bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
檢視歷史資料
前一條命令加上--from-beginning
檢視partition最新offset(producer的offset)
$bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092--topic test
檢視現有consumer group的group id
$bin/kafka-consumer-groups.sh--bootstrap-server localhost:9092 –list
檢視當前 已有的topic
$bin/kafka-topics.sh--list --zookeeper localhost:2181
Kafka Demo實現(java)
producer
package com.cloudwiz.kafkatest.example;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Producer extends Thread{
private final int producerNo;
private final String topic;
private final KafkaProducer<Integer,String> producer;
public Producer(String topic,int producerNo) {
this.topic = topic;
this.producerNo=producerNo;
Properties props = new Properties();
props.put("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put("client.id", "Producer_"+producerNo);
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props);
}
public void run() {
int messageNo = 0;
while(true) {
String messagestr = "Message_"+messageNo+"_from_producer_"+producerNo ;
try {
//同步傳送
RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic , messageNo , messagestr)).get();
System.out.println("Sent message:"+messagestr+", partition_"+recordMetadata.partition() + ", offset"+recordMetadata.offset());
//非同步傳送
// producer.send(new ProducerRecord<>(topic , messageNo , messagestr),new ProducerCallback());
// System.out.println("Sent message:"+messagestr);
//傳送並忘記
// producer.send(new ProducerRecord<>(topic , messageNo , messagestr));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
++messageNo;
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* @param args [0]:topic [1] 一次生成producer數量
*/
public static void main(String[] args) {
int numProducer = Integer.parseInt(args[1]);
for(int i = 0;i<numProducer;i++) {
Producer producer = new Producer(args[0] , i);
producer.start();
}
}
}
class ProducerCallback implements Callback{
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e!=null)e.printStackTrace();
}
}
consumer
package com.cloudwiz.kafkatest.example; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class Consumer extends Thread{ private final String topic; private final KafkaConsumer<Integer,String> consumer; private int consumerNo; private String groupId; Consumer(String topic,String groupId,int consumerNo){ this.topic=topic; this.groupId = groupId; Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); this.consumerNo = consumerNo; } @Override public void run() { while(true) { // consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords<Integer,String> records = consumer.poll(1000); for(ConsumerRecord<Integer,String> record:records) { System.out.println("Consumer No." + this.consumerNo + " in group " + groupId + " received a record, " + "key = "+record.key()+", value = "+record.value()+", partiton = "+record.partition()+", offset = "+record.offset()); } } } /** * @param args [0]:topic [1]:一次生成consumer數量 [2]:consumer group id */ public static void main(String[] args) { int numConsumer = Integer.parseInt(args[1]); for(int i = 0;i<numConsumer;i++) { Thread consumer = new Consumer(args[0],args[2],i); consumer.start(); } } }
KafkaProperties
package com.cloudwiz.kafkatest.example;
public class KafkaProperties {
public static final String KAFKA_SERVER_URL = "192.168.235.138";
public static final String KAFKA_SERVER_PORT = "9092";
}
利用AdminClient API建立新topics
可指定partition數和熱replica數
//if topic already exist, will not override
private void createNewTopicInKafka(String token) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ServerProperties.KAFKA_SERVER_IP +":"+ServerProperties.KAFKA_SERVER_PORT);
props.put(AdminClientConfig.CLIENT_ID_CONFIG, "admin");
AdminClient admin = AdminClient.create(props);
try {
if(admin.listTopics().names().get().contains(token))return;
CreateTopicsResult res = admin.createTopics(Collections.singletonList(new NewTopic(token,ServerProperties.KAFKA_NUM_PARTITIONS,ServerProperties.KAFKA_NUM_RELICAS)));
res.all().get();
}catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}finally{
admin.close();
}
}