1. 程式人生 > >訊息佇列中介軟體(三)Kafka 入門指南

訊息佇列中介軟體(三)Kafka 入門指南

Kafka 來源

Kafka的前身是由LinkedIn開源的一款產品,2011年初開始開源,加入了 Apache 基金會,2012年從 Apache Incubator 畢業變成了 Apache 頂級開源專案。同時LinkedIn還有許多著名的開源產品。如:

  • 分散式資料同步系統Databus
  • 高效能運算引擎Cubert
  • Java非同步處理框架ParSeq
  • Kafka流處理平臺

Kafka 介紹

Kafka 用於構建實時資料管道和流應用程式。它具有水平可擴充套件性,容錯性,快速性,並在數千家公司的生產環境中執行。

從官方我們可以知道ApacheKafka一個分散式流媒體平臺

。這到底是什麼意思呢?

流媒體平臺有三個關鍵功能:

  • 釋出和訂閱記錄資料流,類似於訊息佇列或企業訊息傳遞系統。
  • 有容錯能力的可以持久化的儲存資料流。
  • 記錄發生時可以進行流處理。

Kafka 通常用於兩大類應用:

  • 構建可在系統或應用程式之間可靠獲取資料的實時流資料管道
  • 構建轉換或響應資料流的實時流處理

Kafka 基本概念

  • Producer - 訊息和資料的生產者,向 Kafka 的一個 Topic 釋出訊息的程序/程式碼/服務。
  • Consumer - 訊息和資料的消費者,訂閱資料(Topic)並且處理其釋出的訊息的程序/程式碼/服務。
  • Consumer Group
    - 邏輯概念,對於同一個 Topic,會廣播不同的 Group,一個Group中,只有一個consumer 可以消費該訊息。
  • Broker - 物理概念,Kafka 叢集中的每個 Kafka 節點。
  • Topic - 邏輯概念,Kafka訊息的類別,對資料進行區分,隔離。
  • Partition - 物理概念,分片,Kafka 下資料儲存的基本單元,一個 Topic 資料,會被分散儲存到多個Partition,每一個Partition是有序的。
  • Replication - 副本,同一個 Partition 可能會有多個 Replica ,多個 Replica 之間資料是一樣的。
  • Replication Leader
    - 一個 Partition 的多個 Replica 上,需要一個 Leade r負責該 Partition 上與 Produce 和 Consumer 互動
  • ReplicaManager - 負責管理當前的 broker 所有分割槽和副本的資訊,處理 KafkaController 發起的一些請求,副本狀態的切換,新增/讀取訊息等。

概念的延伸

Partition

  • 每一個Topic被切分為多個Partitions
  • 消費者資料要小於等於Partition的數量
  • Broker Group中的每一個Broker儲存Topic的一個或多個Partitions
  • Consumer Group中的僅有一個Consumer讀取Topic的一個或多個Partions,並且是唯一的Consumer。

Replication

  • 當叢集中有Broker掛掉的時候,系統可以主動的使用Replicas提供服務。
  • 系統預設設定每一個Topic的Replication的係數為1,可以在建立Topic的時候單獨設定。

Replication特點

  • Replication的基本單位是Topic的Partition。
  • 所有的讀和寫都從Leader進,Followers只是作為備份。
  • Follower必須能夠及時的複製Leader的資料
  • 增加容錯性與可擴充套件性。

Kafka 訊息結構

在 Kafka2.0 中的訊息結構如下(整理自官網)。

baseOffset: int64 - 用於記錄Kafka這個訊息所處的偏移位置
batchLength: int32 - 用於記錄整個訊息的長度
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2) - 一個固定值,用於快速判斷是否是Kafka訊息
crc: int32 - 用於校驗資訊的完整性
attributes: int16 - 當前訊息的一些屬性

bit 0~2:

0: no compression
1: gzip
2: snappy
3: lz4

bit 3: timestampType
​ bit 4: isTransactional (0 means not transactional)
​ bit 5: isControlBatch (0 means not a control batch)
​ bit 6~15: unused

lastOffsetDelta: int32
firstTimestamp : int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records:

length: varint
attributes: int8

bit 0~7: unused

timestampDelta: varint
offsetDelta: varint
keyLength: varint
key: byte[]
valueLen: varint
value: byte[]
Headers => [Header]

headerKeyLength: varint
headerKey: String
headerValueLength: varint
Value: byte[]

關於訊息結構的一些釋義。

  • Offset -用於記錄Kafka這個訊息所處的偏移位置
  • Length - 用於記錄整個訊息的長度
  • CRC32 - 用於校驗資訊的完整性
  • Magic - 一個固定值,用於快速判斷是否是Kafka訊息
  • Attributes - 當前訊息的一些屬性
  • Timestamp - 訊息的時間戳
  • Key Length - key的長度
  • Key - Key的具體值
  • Value Length - 值的長度
  • Value - 具體的訊息值

Kafka 優點

  1. 分散式 - Kafka是分散式的,多分割槽,多副本的和多訂閱者的,基於Zookeeper排程。
  2. 永續性和擴充套件性 - Kafka使用分散式提交日誌,這意味著訊息會盡可能快地保留在磁碟上,因此它是持久的。同時具有一定的容錯性,Kafka支援線上的水平擴充套件,訊息的自平衡。
  3. 高效能 - Kafka對於釋出和訂閱訊息都具有高吞吐量。 即使儲存了許多TB的訊息,它也保持穩定的效能。且延遲低,適用高併發。時間複雜的為o(1)。

Kafka 應用

  1. 用於聚合分散式應用程式中的訊息。進行操作監控。
  2. 用於跨組織的從多個服務收集日誌,然後提供給多個伺服器,解決日誌聚合問題。
  3. 用於流處理,如Storm和Spark Streaming,從kafka中讀取資料,然後處理在寫入kafka供應用使用。

Kafka 安裝

安裝 Jdk

具體步驟此處不說。

安裝 Kafka

直接官方網站下載對應系統的版本解壓即可。
由於Kafka對於windows和Unix平臺的控制指令碼是不同的,因此如果是windows平臺,要使用bin\windows\而不是bin/,並將指令碼副檔名更改為.bat。以下命令是基於Unix平臺的使用。

# 解壓
tar -xzf kafka_2.11-2.0.0.tgz
# 啟動Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 啟動Kafka
bin/kafka-server-start.sh config/server.properties
# 或者後臺啟動
bin/kafka-server-start.sh config/server.properties &

讓我們建立一個名為“test”的主題,它只包含一個分割槽,只有一個副本:

`> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

如果我們執行list topic命令,我們現在可以看到該主題:

`> bin/kafka-topics.sh --list --zookeeper localhost:2181 test

或者,您也可以將代理配置為在釋出不存在的主題時自動建立主題,而不是手動建立主題。

檢視Topic的資訊

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic Hello-Kafka

執行生產者,然後在控制檯中鍵入一些訊息以傳送到伺服器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message`

執行消費者,檢視收到的訊息

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
> This is a message
> This is another message

Kafka 工程例項

POM 依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
</dependency>

生產者

編寫生產者 Java 程式碼。關於 Properties 中的值的意思描述可以在官方文件中找到 http://kafka.apache.org/ 。下面的生產者向 Kafka 推送了10條訊息。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

/**
 * <p>
 * Kafka生產者,傳送10個數據
 *
 * @Author niujinpeng
 * @Date 2018/11/16 15:45
 */
public class MyProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.110.132:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }

}

消費者

編寫消費者 Java 程式碼。


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * <p>
 * Kafka消費者
 *
 * @Author niujinpeng
 * @Date 2018/11/19 15:01
 */
public class MyConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.110.132:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

}

可以在控制檯看到成功執行後的輸出,由 offset 可以看到已經消費了10條訊息。

 INFO | Kafka version : 2.0.0
 INFO | Kafka commitId : 3402a8361b734732
 INFO | Cluster ID: 0Xrk5M1CSJet0m1ut3zbiw
 INFO | [Consumer clientId=consumer-1, groupId=test] Discovered group coordinator 192.168.110.132:9092 (id: 2147483647 rack: null)
 INFO | [Consumer clientId=consumer-1, groupId=test] Revoking previously assigned partitions []
 INFO | [Consumer clientId=consumer-1, groupId=test] (Re-)joining group
 INFO | [Consumer clientId=consumer-1, groupId=test] Successfully joined group with generation 4
 INFO | [Consumer clientId=consumer-1, groupId=test] Setting newly assigned partitions [test-0]
offset = 38, key = 0, value = 0
offset = 39, key = 1, value = 1
offset = 40, key = 2, value = 2
offset = 41, key = 3, value = 3
offset = 42, key = 4, value = 4
offset = 43, key = 5, value = 5
offset = 44, key = 6, value = 6
offset = 45, key = 7, value = 7
offset = 46, key = 8, value = 8
offset = 47, key = 9, value = 9

問題

如果java.net.InetAddress.getCanonicalHostName 取到的是主機名。需要修改 Kafka 的配置檔案。

vim server.properties
# x.x.x.x是伺服器IP
advertised.listeners=PLAINTEXT://x.x.x.x:9092

<完>
本文原發於個人部落格:https://www.codingme.net 轉載請註明出處