1. 程式人生 > >kafka叢集搭建及原理

kafka叢集搭建及原理

Apache Kafka 企業級訊息佇列

爬蟲課程:原生佇列、多執行緒重複消費的問題、ArrayBlockingQueue阻塞佇列

分散式爬蟲:使用Redis的list資料結構做佇列。

分散式電商:AMQ 訊息佇列、釋出一個商品時傳送一個訊息,有程式消費訊息建立靜態化頁面。

Apache Kafka:訊息佇列、隨著大資料興起,現在非常流行。

1、課程目標

  • 理解 Apache Kafka是什麼
  • 掌握Apache Kafka的基本架構
  • 搭建Kafka叢集
  • 掌握操作叢集的兩種方式
  • 熟悉Apache Kafka原理
    1. Kafka原理-分片與副本機制
    2. Kafka原理-訊息不丟失機制
    3. Kafka原理-訊息儲存及查詢機制
    4. Kafka原理-生產者資料分發策略
    5. Kafka原理-消費者的負載均衡機制
  • 瞭解Apache Kafka 監控及運維

2Apache Kafka是什麼

是什麼?有什麼用?怎麼用?

是什麼?三個定義

  1. Apache Kafka 是一個訊息佇列(生產者消費者模式)
  2. Apache Kafka 目標:構建企業中統一的、高通量、低延時的訊息平臺。
  3. 大多的是訊息佇列(訊息中介軟體)都是基於JMS標準實現的,Apache Kafka 類似於JMS的實現。

有什麼用?(訊息佇列有什麼用?)

  1. 作為緩衝,來異構、解耦系統。
  • 使用者註冊需要完成多個步驟,每個步驟執行都需要很長時間。代表使用者等待時間是所有步驟的累計時間。
  • 為了減少使用者等待的時間,使用並行執行,有多少個步驟,就開啟多少個執行緒來執行。代表使用者等待時間是所有步驟中耗時最長的那個步驟時間。
  • 有了新得問題:開啟多執行緒執行每個步驟,如果以一個步驟執行異常,或者嚴重超時,使用者等待的時間就不可控了。
  • 通過訊息佇列來保證。
    1. 註冊時,立即返回成功。
    2. 傳送註冊成功的訊息到訊息平臺。
    3. 對註冊資訊感興趣的程式,可以訊息訊息。

3、Apache Kafka的基本架構

Kafka Cluster:由多個伺服器組成。每個伺服器單獨的名字broker(掮客)。

Kafka Producer:生產者、負責生產資料。

Kafka consumer:消費者、負責消費資料。

Kafka Topic: 主題,一類訊息的名稱。儲存資料時將一類資料存放在某個topci下,消費資料也是消費一類資料。

訂單系統:建立一個topic,叫做order。

使用者系統:建立一個topic,叫做user。

商品系統:建立一個topic,叫做product。

注意:Kafka的元資料都是存放在zookeeper中。

4、搭建Kafka叢集

4.1、準備3臺虛擬機器

192.168.140.128 kafka01

192.168.140.129 kafka02

192.168.140.130 kafka03

4.2初始化環境

1)安裝jdk、安裝zookeeper

2)安裝目錄

安裝包存放的目錄:/export/software

安裝程式存放的目錄:/export/servers

資料目錄:/export/data

日誌目錄:/export/logs

mkdir -p /export/servers/

mkdir -p /export/software /

mkdir -p /export/data /

mkdir -p /export/logs /

3)安裝使用者

安裝hadoop,會建立一個hadoop使用者

安裝kafka,建立一個kafka使用者

或者 建立bigdata使用者,用來安裝所有的大資料軟體。

本例:使用root使用者

  1. 驗證環境
    1. jdk環境 
    1. zookeeper環境

zkServer.sh status

4.3搭建Kafka叢集

4.3.1、準備安裝包

由於kafka是scala語言編寫的,基於scala的多個版本,kafka釋出了多個版本。

其中2.11是推薦版本。

4.3.2、下載安裝包及解壓

tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/

cd /export/servers/

rm -rf /export/servers/kafka

rm -rf /export/logs/kafka/

rm -rf /export/data/kafka

mv kafka_2.11-1.0.0 kafka

  1. 解壓檔案
  2. 刪除之前的安裝記錄
  3. 重新命名

4.3.3、檢視目錄及修改配置檔案

4.3.3.1檢視目錄

4.3.3.2修改配置檔案

進入配置目錄,檢視server.properties檔案

cat server.properties |grep -v "#"

通過以上命令,檢視到了預設的配置檔案,對預設的檔案進行修改。

修改三個地方

  1. Borker.id
  2. 資料存放的目錄,注意目錄如果不存在需要新建下
  3. zookeeper的地址資訊

# broker.id 標識了kafka叢集中一個唯一broker。

broker.id=0

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

# 存放生產者生產的資料 資料一般以topic的方式存放

# 建立一個數據存放目錄 /export/data/kafka  ---  mkdir -p /export/data/kafka

log.dirs=/export/data/kafka

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

# zk的資訊

zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

4.3.4、分發配置檔案及修改brokerid

將修改好的配置檔案,分發到node02,node03上。

先在node02、node03上刪除以往的安裝記錄

rm -rf /export/servers/kafka

rm -rf /export/logs/kafka/

rm -rf /export/data/kafka

分發安裝包

scp -r /export/servers/kafka/ node02:/export/servers/

scp -r /export/servers/kafka/ node03:/export/servers/

修改node02上的broker.id

vi /export/servers/kafka/config/server.properties

修改node03上的broker.id

vi /export/servers/kafka/config/server.properties

4.3.4、啟動叢集

cd /export/servers/kafka/bin

./kafka-server-start.sh /export/servers/kafka/config/server.properties

4.3.5、檢視Kafka叢集

由於kafka叢集並沒有UI介面可以檢視。

需要藉助外部工具,來檢視卡夫卡的叢集

這個工具是一個java程式,必須要安裝好JDK

5、操作叢集的兩種方式

需求:訂單系統,需要傳送訊息。 後面後3個程式需要接受這個訊息,並做後續的處理。

5.1、使用控制檯執行

  1. 建立一個訂單的topic。

bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic order

  1. 編寫程式碼啟動一個生產者,生產資料

bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic order

  1. 編寫程式碼啟動給一個消費者,消費資料

bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic order

5.2使用Java api執行

1)java工程-maven,依賴。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.1</version>
</dependency>

  1. 編寫程式碼-寫生產者的程式碼

/**
 * 訂單的生產者程式碼
 */
public class OrderProducer {
    public static void main(String[] args) throws InterruptedException {
        /* 1、連線叢集,通過配置檔案的方式
         * 2、傳送資料-topic:order,value
         */
Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 1000; i++) {
            // 傳送資料 ,需要一個producerRecord物件,最少引數 String topic, V value
kafkaProducer.send(new ProducerRecord<String, String>("order", "訂單資訊!"+i));
            Thread.sleep(100);
        }
    }
}

  1. 編寫程式碼-寫消費者的程式碼

/**
 * 消費訂單資料--- javaben.tojson
 */
public class OrderConsumer {
    public static void main(String[] args) {
        // 1\連線叢集
Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
//        2、傳送資料 傳送資料需要,訂閱下要消費的topic。  order
kafkaConsumer.subscribe(Arrays.asList("order"));
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);// jdk queue offer插入、poll獲取元素。 blockingqueue put插入原生,take獲取元素
for (ConsumerRecord<String, String> record : consumerRecords) {
                System.out.println("消費的資料為:" + record.value());
            }
        }
    }
}

6、Apache Kafka原理

6.1Apache Kafka原理-分片與副本機制

bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic order

分片:solrcloud中有提及到。

當資料量非常大的時候,一個伺服器存放不了,就將資料分成兩個或者多個部分,存放在多臺伺服器上。每個伺服器上的資料,叫做一個分片。

副本:solrcloud中有提及到。

當資料只儲存一份的時候,有丟失的風險。為了更好的容錯和容災,將資料拷貝幾份,儲存到不同的機器上。

6.2Apache Kafka原理-訊息不丟失機制

6.2.1、生產者端訊息不丟失

  1. 訊息生產分為同步模式和非同步模式
  2. 訊息確認分為三個狀態
    1. 0:生產者只負責傳送資料
    2. 1:某個partition的leader收到資料給出響應
    3. -1:某個partition的所有副本都收到資料後給出響應
  3. 在同步模式下
    1. 生產者等待10S,如果broker沒有給出ack響應,就認為失敗。
    2. 生產者重試3次,如果還沒有響應,就報錯。
  4. 在非同步模式下
    1. 先將資料儲存在生產者端的buffer中。Buffer大小是2萬條。
    2. 滿足資料閾值或者數量閾值其中的一個條件就可以傳送資料。
    3. 傳送一批資料的大小是500條。

如果broker遲遲不給ack而buffer又滿了

開發者可以設定是否直接清空buffer中的資料

6.2.2Borker端訊息不丟失

broker端的訊息不丟失,其實就是用partition副本機制來保證。

Producer  ack  -1. 能夠保證所有的副本都同步好了資料。其中一臺機器掛了,並不影像資料的完整性。

6.2.3、消費者端訊息不丟失

只要記錄offset值,消費者端不會存在訊息不丟失的可能。只會重複消費。

6.3Apache Kafka原理-訊息儲存及查詢機制

6.3.1、檔案儲存機制

segment段中有兩個核心的檔案一個是log,一個是index。 當log檔案等於1G時,新的會寫入到下一個segment中。

通過下圖中的資料,可以看到一個segment段差不多會儲存70萬條資料。

6.3.2、檔案查詢機制

6.4Apache Kafka原理-生產者資料分發策略

kafka在資料生產的時候,有一個數據分發策略。預設的情況使用DefaultPartitioner.class類。

這個類中就定義資料分發的策略。

  1. 如果是使用者制定了partition,生產就不會呼叫DefaultPartitioner.partition()方法
  2. 當用戶指定key,使用hash演算法。如果key一直不變,同一個key算出來的hash值是個固定值。如果是固定值,這種hash取模就沒有意義。

Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions

  1. 當用既沒有指定partition也沒有key。

資料分發策略的時候,可以指定資料發往哪個partition。

ProducerRecord 的構造引數中有partition的時候就可以傳送到對應partition上

/**
 * Creates a record to be sent to a specified topic and partition
 *
 * @param topic The topic the record will be appended to
 * @param partition The partition to which the record should be sent
 * @param key The key that will be included in the record
 * @param value The record contents
 */
public ProducerRecord(String topic, Integer partition, K key, V value) {
    this(topic, partition, null, key, value, null);
}

如果生產者沒有指定partition,但是傳送訊息中有key,就key的hash值。

/**
 * Create a record to be sent to Kafka
 *
 * @param topic The topic the record will be appended to
 * @param key The key that will be included in the record
 * @param value The record contents
 */
public ProducerRecord(String topic, K key, V value) {
    this(topic, null, null, key, value, null);
}

既沒有指定partition,也沒有key的情況下如何傳送資料。

使用輪詢的方式傳送資料。

/**
 * Create a record with no key
 *
 * @param topic The topic this record should be sent to
 * @param value The record contents
 */
public ProducerRecord(String topic, V value) {
    this(topic, null, null, null, value, null);
}

6.5Apache Kafka原理-消費者的負載均衡機制

一個partition只能被一個組中的成員消費。

所以如果消費組中有多於partition數量的消費者,那麼一定會有消費者無法消費資料。

7、瞭解Apache Kafka 監控及運維

7.1、一鍵啟動Kafka

7.2 、UI介面