1. 程式人生 > >Kafka介紹和使用

Kafka介紹和使用

什麼是Kafka?

分散式流資料平臺,類似訊息佇列

流資料平臺可以:1、釋出和訂閱流資料

                                   2、容錯的儲存流資料

                                   3、處理流資料

Kafka通常用來:  1、作為實時流資料通道,應用或系統間傳輸資料(實時性,低延遲,可

用於暫存重要資料)

                            2、做實時流資料處理、響應

Kafka通常用來做告訴輸出的快取,這是由於它變態的效能。另外它可以暫存重要資料,因為從Kafka消費(讀取)資料後資料是不會立即刪除的,而且這些資料可以設定備份。

 

Kafka的API

Producer:用於向Kafka生產資料,單個producer可以對應多topics

Consumer:用於從Kafka消費資料,單個consumer可以對應多topics

Streams:用於做簡單的流資料處理,可生產可消費,多topics

Connector:用於建立執行多個producer和consumer

多語言:Java等主流語言都有對應的API

Topic:records的分類,所有record都是釋出到某一個topic的

Record:{key,value,timestamp}

可以把Kafka理解為不可修改的queue(of records),儲存一段歷史時間的資料eg.兩天

每個consumer元資料只儲存一個offset,可由consumer自由控制,也就是說offset不一定是簡單遞增的,如果有這個需要,可以返回讀取兩天前的資料,或者直接讀取最新的資料

 

Partition 一個topic被分為多個partition,可存在不同機器上,record被分配到不同partition(可根據key寫分割槽函式),減少單個節點負載

Replica 每個partition有多個複製,可存在不同機器上,由一個leader和零或多個follower組成,只有leader可讀可寫,follower只寫,leader掛掉後從follower中選出新的leader

保證:每個partition內按輸入順序存放

其他概念

Consumer group:同一個topic的不同partition平均動態分配到一個consumer group的所有consumer上,同一個topic可以broadcast到每一個consumer group(subscriber),每一個topic被分解傳輸到group裡的不同consumer,如果同一個group中的消費者數量多於本topic的partition數量,多餘的消費者將接收不到訊息

Broker:Kafka叢集中的機器/服務被成為broker, 是一個物理概念。

ISR:in-syncreplicas,存活的replica,如果一個topic的partitions有三個複製,存在0,1,2號機器上,那麼他的ISR就是0,1,2

Shrink:如果一個broker掛掉了,ISR會收縮(shrink),重新起來後ISR會擴大(expand)

HW(highwatermark高水位線):指partition的ISR中所對應的log的LEO(LogEndOffset)中最小的那個值

Commit(提交):指consumer讀取後更新當前分割槽offset,有多種處理提交的方法

Kafka效能:

Producer Throughput

Single producer thread, no replication

821,557 records/sec

(78.3 MB/sec)

Consumer Throughput

Single Consumer

940,521 records/sec

(89.7 MB/sec)

這只是單個producer和consumer,單個執行緒,多個執行緒、或者多個producer或consumer,這個資料還要恐怖。

詳見

https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

安裝和簡單命令列producer、consumer

(來自http://kafka.apache.org/quickstart)

下載解壓:

$tar -xzf kafka_2.11-1.1.0.tgz

$cd kafka_2.11-1.1.0

啟動Kafka自帶單機版zookeeper

$bin/zookeeper-server-start.sh config/zookeeper.properties

啟動Kafka

$bin/kafka-server-start.sh config/server.properties

建立topic

$bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

檢視topic資訊

$bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

建立producer傳送資料(命令列)

$bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

>This is amessage

>This is anothermessage

>

建立consumer接收資料(命令列)

$bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

檢視歷史資料

前一條命令加上--from-beginning

檢視partition最新offset(producer的offset)

$bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092--topic test

檢視現有consumer group的group id

$bin/kafka-consumer-groups.sh--bootstrap-server localhost:9092 –list

檢視當前 已有的topic

$bin/kafka-topics.sh--list --zookeeper localhost:2181

Kafka Demo實現(java

producer

package com.cloudwiz.kafkatest.example;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Producer extends Thread{
	private final int producerNo;
	private final String topic;
	private final KafkaProducer<Integer,String> producer;
	
	public Producer(String topic,int producerNo) {
		this.topic = topic;
		this.producerNo=producerNo;
		
        Properties props = new Properties();
        props.put("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put("client.id", "Producer_"+producerNo);
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);

	}
	
	public void run() {
		int messageNo = 0;
		while(true) {
			String messagestr = "Message_"+messageNo+"_from_producer_"+producerNo ;
			try {
				//同步傳送
				RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic , messageNo , messagestr)).get();
				System.out.println("Sent message:"+messagestr+", partition_"+recordMetadata.partition() + ", offset"+recordMetadata.offset());
				//非同步傳送
//				producer.send(new ProducerRecord<>(topic , messageNo , messagestr),new ProducerCallback());
//				System.out.println("Sent message:"+messagestr);
				//傳送並忘記
//				producer.send(new ProducerRecord<>(topic , messageNo , messagestr));
			} catch (InterruptedException | ExecutionException e) {
				e.printStackTrace();
			}
			++messageNo;
			try {
				Thread.sleep(300);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	/**
	 * @param args [0]:topic [1] 一次生成producer數量
	 */
	public static void main(String[] args) {
		int numProducer = Integer.parseInt(args[1]);
		for(int i = 0;i<numProducer;i++) {
			Producer producer = new Producer(args[0] , i);
			producer.start();
		}
	}
}

class ProducerCallback implements Callback{

	@Override
	public void onCompletion(RecordMetadata metadata, Exception e) {
		if(e!=null)e.printStackTrace();
	}
	
}

consumer

package com.cloudwiz.kafkatest.example;

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer extends Thread{
	private final String topic;
	private final KafkaConsumer<Integer,String> consumer;
	private int consumerNo;
	private String groupId;
	
	Consumer(String topic,String groupId,int consumerNo){
		this.topic=topic;
		
		this.groupId = groupId;
		Properties props = new Properties();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
		consumer = new KafkaConsumer<>(props);
		
		this.consumerNo = consumerNo;
	}
	
	@Override
	public void run() {
		while(true) {
//			consumer.subscribe(Collections.singletonList(topic));
			ConsumerRecords<Integer,String> records = consumer.poll(1000);
			for(ConsumerRecord<Integer,String> record:records) {
				System.out.println("Consumer No." + this.consumerNo + " in group " + groupId + " received a record, "
						+ "key = "+record.key()+", value = "+record.value()+", partiton = "+record.partition()+", offset = "+record.offset());
			}
		}
	}
	
	
	/**
	 * @param args [0]:topic [1]:一次生成consumer數量  [2]:consumer group id
	 */
	public static void main(String[] args) {
		int numConsumer = Integer.parseInt(args[1]);
		for(int i = 0;i<numConsumer;i++) {
			Thread consumer = new Consumer(args[0],args[2],i);
			consumer.start();
		}
	}
}

KafkaProperties

package com.cloudwiz.kafkatest.example;

public class KafkaProperties {
	public static final String KAFKA_SERVER_URL = "192.168.235.138";
	public static final String KAFKA_SERVER_PORT = "9092";
}

利用AdminClient API建立新topics

可指定partition數和熱replica數

    //if topic already exist, will not override
    private void createNewTopicInKafka(String token) {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ServerProperties.KAFKA_SERVER_IP +":"+ServerProperties.KAFKA_SERVER_PORT);
        props.put(AdminClientConfig.CLIENT_ID_CONFIG, "admin");
        AdminClient admin = AdminClient.create(props);

        try {
            if(admin.listTopics().names().get().contains(token))return;

            CreateTopicsResult res = admin.createTopics(Collections.singletonList(new NewTopic(token,ServerProperties.KAFKA_NUM_PARTITIONS,ServerProperties.KAFKA_NUM_RELICAS)));
            res.all().get();
        }catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }finally{
            admin.close();
        }

    }