1. 程式人生 > >Cris 玩轉大資料系列之訊息佇列神器 Kafka

Cris 玩轉大資料系列之訊息佇列神器 Kafka

Cris 玩轉大資料系列之訊息佇列神器 Kafka

Author:Cris

文章目錄

1. Kafka 概述

1.1 訊息佇列

先來看看訊息佇列的實現原理圖(以 Kafka 為例)

① 點對點模式

一對一,消費者主動拉取資料,訊息收到後訊息清除

點對點模型通常是一個基於拉取或者輪詢的訊息傳送模型,這種模型從佇列中請求資訊,而不是將訊息推送到客戶端。這個模型的特點是傳送到佇列的訊息被一個且只有一個接收者接收處理,即使有多個訊息監聽者也是如此

② 釋出/訂閱模式

一對多,資料生產後,推送給所有訂閱者

釋出訂閱模型則是一個基於推送的訊息傳送模型。釋出訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監聽主題時才接收訊息,而持久訂閱者則監聽主題的所有訊息,即使當前訂閱者不可用,處於離線狀態

1.2 為什麼需要訊息佇列?

  1. 解耦

    允許你獨立的擴充套件或修改兩邊的處理過程,只要確保它們遵守同樣的介面約束

  2. 冗餘

    訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險。許多訊息佇列所採用的"插入-獲取-刪除"正規化中,在把一個訊息從佇列中刪除之前,需要你的處理系統明確的指出該訊息已經被處理完畢,從而確保你的資料被安全的儲存直到你使用完畢

  3. 擴充套件性

    因為訊息佇列解耦了你的處理過程,所以增大訊息入隊和處理的頻率是很容易的,只要另外增加處理過程即可

  4. 靈活性 & 峰值處理能力

    在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量並不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用訊息佇列能夠使關鍵元件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰

  5. 可恢復性

    系統的一部分元件失效時,不會影響到整個系統。訊息佇列降低了程序間的耦合度,所以即使一個處理訊息的程序掛掉,加入佇列中的訊息仍然可以在系統恢復後被處理

  6. 順序保證

    在大多使用場景下,資料處理的順序都很重要。大部分訊息佇列本來就是排序的,並且能保證資料會按照特定的順序來處理。(Kafka保證一個Partition內的訊息的有序性)

  7. 緩衝

    有助於控制和優化資料流經過系統的速度,解決生產訊息和消費訊息的處理速度不一致的情況

  8. 非同步通訊

    很多時候,使用者不想也不需要立即處理訊息。訊息佇列提供了非同步處理機制,允許使用者把一個訊息放入佇列,但並不立即處理它。想向佇列中放入多少訊息就放多少,然後在需要的時候再去處理它們

1.3 什麼是 Kafka?

在流式計算中,Kafka一般用來快取資料,Storm通過消費Kafka的資料進行計算

1)Apache Kafka是一個開源訊息系統,由Scala寫成。是由Apache軟體基金會開發的一個開源訊息系統專案

2)Kafka最初是由LinkedIn公司開發,並於2011年初開源。2012年10月從Apache Incubator畢業。該專案的目標是為處理實時資料提供一個統一、高通量、低等待的平臺

3)**Kafka是一個分散式訊息佇列。**Kafka對訊息儲存時根據Topic進行歸類,傳送訊息者稱為Producer,訊息接受者稱為Consumer,此外kafka叢集有多個kafka例項組成,每個例項(server)稱為broker

4)無論是kafka叢集,還是consumer都依賴於zookeeper叢集儲存一些meta資訊,來保證系統可用性

整體架構圖

詳細架構圖

1)Producer :訊息生產者,就是向kafka broker發訊息的客戶端;

2)Consumer :訊息消費者,向kafka broker取訊息的客戶端;

3)Topic :可以簡單理解為一個佇列;

4) Consumer Group (CG):這是kafka用來實現一個topic訊息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的訊息會複製(不是真的複製,是概念上的)到所有的CG,但每個partion只會把訊息發給該CG中的一個consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次傳送訊息到不同的topic;

5)Broker :一臺kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic;

6)Partition:為了實現擴充套件性,一個非常大的topic可以分佈到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的佇列。partition中的每條訊息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將訊息發給consumer,不保證一個topic的整體(多個partition間)的順序;

7)Offset:kafka的儲存檔案都是按照offset.log來命名,用offset做名字的好處是方便查詢。例如你想找位於2049的位置,只要找到2048.log的檔案即可。當然the first offset就是00000000000.log

2. Kafka 叢集部署

瞭解完訊息佇列和 Kafka 的基礎知識,接下來就來搭建一個微型 Kafka 叢集,通過對這個微型 Kafka 叢集的操作來快速上手 Kafka

2.1 Kafka 叢集規劃

hadoop101 hadoop102 hadoop103
zk zk zk
Kafka Kafka Kafka

2.2 jar 包下載

http://kafka.apache.org/downloads.html

2.3 Kafka叢集部署

1)解壓安裝包

[[email protected] software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

2)修改解壓後的檔名稱

[[email protected] module]$ mv kafka_2.11-0.11.0.0/ kafka

3)在/opt/module/kafka目錄下建立logs資料夾

[[email protected] kafka]$ mkdir logs

4)修改配置檔案

[[email protected] kafka]$ cd config/
[[email protected] config]$ vim server.properties

對應下面內容進行修改:

#broker的全域性唯一編號,不能重複
broker.id=1

#刪除topic功能使能
delete.topic.enable=true

#處理網路請求的執行緒數量
num.network.threads=3

#用來處理磁碟IO的現成數量
num.io.threads=8

#傳送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的緩衝區大小
socket.request.max.bytes=104857600

#kafka執行日誌存放的路徑	
log.dirs=/opt/module/kafka/logs
#topic在當前broker上的分割槽個數
num.partitions=1
#用來恢復和清理data下資料的執行緒數量
num.recovery.threads.per.data.dir=1
#segment檔案保留的最長時間,超時將被刪除
log.retention.hours=168
#配置連線Zookeeper叢集地址
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181

5)配置環境變數

vim /etc/profile

6)分發安裝包

xsync kafka/

注意配置其他機子的環境變數

7)分別在hadoop102和hadoop103上修改配置檔案/opt/module/kafka/config/server.properties中的broker.id=2、broker.id=3

​ 注:broker.id不得重複

8)啟動叢集

這裡 Cris 寫了一個群起指令碼

寫指令碼之前先複習一下 Shell 的基礎知識

1、由單引號括起來的字元都作為普通字元出現。特殊字元用單引號括起來以後,也會失去原有意義,而只作為普通字元解釋

2、由雙引號括起來的字元,除$(美元符號)、\(反斜槓)、’(單引號)、和”(雙引號)這幾個字元仍是特殊字元並保留其特殊功能外,其餘字元仍作為普通字元對待

具體的參考文章

然後指令碼具體內容如下:

cat /home/cris/bin/clusters | while read line
do
{
	echo "開始啟動-->"$line
	ssh $line "source /etc/profile;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/null 2>&1 &" 
}&
wait
done
echo '***Kafka叢集啟動完成***'

其中 cluster 的內容如下:

hadoop101
hadoop102
hadoop103

改指令碼具體的解釋這裡不過多解釋,但是需要注意的是:執行該指令碼需要 hadoop101,hadoop102,hadoop103 實現免密登入,具體實現流程請參考 Cris 之前的 Hadoop 筆記

關於 & 符號:後臺執行 shell 命令

關於 /dev/null :輸出重定向常用的黑洞

關於 wait:等待 shell 命令執行

關於 nohup:終端關閉不會影響已經開啟的程序

具體 Linux 命令參考文章:

nohup 參考

wait 參考

/dev/null 參考

Linux 特殊符號命令集合

其他參考文章一

群起指令碼參考一

群起指令碼參考二

然後是群停指令碼

cat /home/cris/bin/clusters | while read line
do
{
	echo "開始停止-->"$line
	ssh $line "source /etc/profile;nohup ${KAFKA_HOME}/bin/kafka-server-stop.sh >/dev/null 2>&1 &" 
}&
wait
done
echo '***Kafka叢集正在從 Zookeeper 上登出停止,請等待一段時間再使用 jpsall 命令***'

其中 jpsall 指令用來檢視所用機子上的 Java 程序

效果如下:

注意:啟動 Kafka 叢集之前請先啟動 Zookeeper 叢集

2.4 Kafka 命令列操作

1)檢視當前伺服器中的所有topic

[[email protected] bin]$ kafka-topics.sh --zookeeper hadoop101:2181 --list

2)建立topic

[[email protected] kafka]$ kafka-topics.sh --zookeeper hadoop101:2181 
--create --topic first --partitions 1 --replication-factor 3

選項說明:

–topic 定義topic名

–replication-factor 定義副本數

–partitions 定義分割槽數

3)刪除topic

kafka-topics.sh --zookeeper hadoop101:2181 
--delete --topic first

需要server.properties中設定delete.topic.enable=true否則只是標記刪除或者直接重啟

演示圖如下:

4)傳送訊息

[[email protected] bin]$  kafka-console-producer.sh --broker-list hadoop101:9092 --topic first

5)消費訊息

[[email protected] ~]$ kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic first

–from-beginning:會把first主題中以往所有的資料都讀取出來。根據業務場景選擇是否增加該配置

6)檢視某個Topic的詳情

[[email protected] ~]$ kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic first

3. Kafka 工作流程分析

3.1 寫入方式

producer採用推(push)模式將訊息釋出到broker,每條訊息都被追加(append)到分割槽(patition)中,屬於順序寫磁碟(順序寫磁碟效率比隨機寫記憶體要高,保障kafka吞吐率)

3.2 分割槽(Partition)

訊息傳送時都被髮送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分割槽日誌)組成,其組織結構如下圖所示:

我們可以看到,每個Partition中的訊息都是有序的,生產的訊息被不斷追加到Partition log上,其中的每一個訊息都被賦予了一個唯一的offset值

1)分割槽的原因

(1)方便在叢集中擴充套件,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個叢集就可以適應任意大小的資料了;

(2)可以提高併發,因為可以以Partition為單位讀寫了。

2)分割槽的原則

(1)指定了patition,則直接使用;

(2)未指定patition但指定key,通過對key的value進行hash出一個patition;

(3)patition和key都未指定,使用輪詢選出一個patition。

Kafka 原始碼定義的預設分割槽類參考如下:

DefaultPartitioner類
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

3.3 副本(Replication)

同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=N)。沒有replication的情況下,一旦broker 宕機,其上所有 patition 的資料都不可被消費,同時producer也不能再將資料存於其上的patition。引入replication之後,同一個partition可能會有多個replication,而這時需要在這些replication之間選出一個leader,producer和consumer只與這個leader互動,其它replication作為follower從leader 中複製資料

3.4 寫入流程(待修改)

1)producer先從zookeeper的 "/brokers/…/state"節點找到該partition的leader

2)producer將訊息傳送給該leader

3)leader將訊息寫入本地log

4)followers從leader pull訊息,寫入本地log後向leader傳送ACK

5)leader收到所有ISR中的replication的ACK後,增加HW(high watermark,最後commit 的offset)並向producer傳送ACK

3.5 Broker 儲存訊息

① 儲存方式

物理上把topic分成一個或多個patition(對應 server.properties 中的num.partitions=3配置),每個patition物理上對應一個資料夾(該資料夾儲存該patition的所有訊息和索引檔案),如下:

② 儲存策略

無論訊息是否被消費,kafka都會保留所有訊息。有兩種策略可以刪除舊資料:

1)基於時間:log.retention.hours=168

2)基於大小:log.retention.bytes=1073741824

需要注意的是,因為Kafka讀取特定訊息的時間複雜度為O(1),即與檔案大小無關,所以這裡刪除過期檔案與提高 Kafka 效能無關

3.6 Zookeeper儲存結構

注意:producer不在zk中註冊,消費者在zk中註冊。

3.7 Kafka消費過程分析

kafka提供了兩套consumer API:高階Consumer API和低階Consumer API

① 高階 API

1)高階API優點

高階API 寫起來簡單

不需要自行去管理offset,系統通過zookeeper自行管理。

不需要管理分割槽,副本等情況,.系統自動管理。

消費者斷線會自動根據上一次記錄在zookeeper中的offset去接著獲取資料(預設設定1分鐘更新一下zookeeper中存的offset)

可以使用group來區分對同一個topic 的不同程式訪問分離開來(不同的group記錄不同的offset,這樣不同程式讀取同一個topic才不會因為offset互相影響)

2)高階API缺點

不能自行控制offset(對於某些特殊需求來說)

不能細化控制如分割槽、副本、zk等

② 低階 API

1)低階 API 優點

能夠讓開發者自己控制offset,想從哪裡讀取就從哪裡讀取。

自行控制連線分割槽,對分割槽自定義進行負載均衡

對zookeeper的依賴性降低(如:offset不一定非要靠zk儲存,自行儲存offset即可,比如存在檔案或者記憶體中)

2)低階API缺點

太過複雜,需要自行控制offset,連線哪個分割槽,找到分割槽leader 等。

③ 消費者組

消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分割槽在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費這個partition。在圖中,有一個由三個消費者組成的group,有一個消費者讀取主題中的兩個分割槽,另外兩個分別讀取一個分割槽。某個消費者讀取某個分割槽,也可以叫做某個消費者是某個分割槽的擁有者。

在這種情況下,消費者可以通過水平擴充套件的方式同時讀取大量的訊息。另外,如果一個消費者失敗了,那麼其他的group成員會自動負載均衡讀取之前失敗的消費者讀取的分割槽

一句話:分組中的消費者可以讀取 topic 中的多個分割槽,但是 topic 中的一個分割槽不能被多個同組的消費者同時讀取

④ 消費方式

consumer採用pull(拉)模式從broker中讀取資料。

push(推)模式很難適應消費速率不同的消費者,因為訊息傳送速率是由broker決定的。它的目標是儘可能以最快速度傳遞訊息,但是這樣很容易造成consumer來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費訊息

對於Kafka而言,pull模式更合適,它可簡化broker的設計,consumer可自主控制消費訊息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。

pull模式不足之處是,如果kafka沒有資料,消費者可能會陷入迴圈中,一直等待資料到達。為了避免這種情況,我們在我們的拉請求中有引數,允許消費者請求在等待資料到達的“長輪詢”中進行阻塞(並且可選地等待到給定的位元組數,以確保大的傳輸大小)

⑤ 消費者組案例

1)需求:測試同一個消費者組中的消費者,同一時刻只能有一個消費者消費一個 topic 的分割槽資訊

2)案例實操

  1. 在hadoop102、hadoop103上修改/opt/module/kafka/config/consumer.properties配置檔案中的group.id屬性為任意組名

    [[email protected] ~]$ vim /opt/module/kafka/config/consumer.properties 
    

    hadoop103 同理

  2. 先啟動 Kafka 叢集,然後分別啟動 hadoop102 和 hadoop103 的消費者客戶端

    [[email protected] kafka]$ kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties
    
    [[email protected] kafka]$ kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first --consumer.config config/consumer.properties
    
    [[email protected] bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
    

    可以發現,hadoop101 生產者傳送的資料只有同一組的 hadoop102 取到了,而沒有被 hadoop103 重複獲取

4. Kafka API實戰

4.1 環境準備

1)啟動zk和kafka叢集,在kafka叢集中開啟一個消費者

這裡 Cris 建立兩個別名,來方便的從 Terminal 開啟 Kafka 消費者和生產者

vim ~/.bashrc

alias kafka_consumer_topic="kafka-console-consumer.sh --zookeeper hadoop101:2181 --topic $1"

alias kafka_producer_topic="kafka-console-producer.sh --broker-list hadoop101:9092 --topic $1"

然後執行 source ~/.bashrc 即可

下次想要啟動消費者或者生產者輸入類似下面命令即可:

kafka_consumer_topic first
kafka_producer_topic first

然後啟動 Zookeeper 叢集

接著啟動 Kafka 叢集

接著啟動一個消費者客戶端

2)匯入 pom 依賴

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>0.11.0.0</version>
    </dependency>
</dependencies>

4.2 Kafka 生產者 API

① 過時的建立生產者 API(瞭解)

public class OldProducer {

	@SuppressWarnings("deprecation")
	public static void main(String[] args) {
		
		Properties properties = new Properties();
		properties.put("metadata.broker.list", "hadoop102:9092");
		properties.put("request.required.acks", "1");
		properties.put("serializer.class", "kafka.serializer.StringEncoder");
		
		Producer<Integer, String> producer = new Producer<Integer,String>(new ProducerConfig(properties));
		
		KeyedMessage<Integer, String> message = new KeyedMessage<Integer, String>("first", "hello world");
		producer.send(message );
	}
}

② 新建立生產者 API

/**
 * 新建立 Producer 的 API,儘量使用 Kafka 官方配置類的常量
 *
 * @author cris
 * @version 1.0
 **/
public class MyProducer {
    public static void main(String[] args) {
        Properties prop = new Properties();

        // Kafka服務端的主機名和埠號
        prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");
        // 等待所有副本節點的應答(最嚴格的資料儲存方式,效率也最低,還可以取值 0 或者 1)
        prop.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        // 訊息傳送失敗最大嘗試次數
        prop.put(ProducerConfig.RETRIES_CONFIG, 0);
        // 一批訊息處理大小
        prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 請求延時
        prop.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 傳送快取區記憶體大小
        prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // key序列化
        prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(prop);
        producer.send(new ProducerRecord<>("first", "simida"));

        producer.close();
    }
}

執行效果如下

③ 建立生產者帶回調函式(新API)

其實就是在生產者傳送訊息後得到 broker 確定收到訊息的呼叫的函式,注意的是,不帶回調函式和帶回調函式的 send 方法都是非同步方法,只有呼叫 send().get() 才是同步方法,瞭解即可

/**
 * 新建立 Producer 的 API,帶有回撥函式
 *
 * @author cris
 * @version 1.0
 **/
@SuppressWarnings("Duplicates")
public class MyCallbackProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(String.valueOf(MyCallbackProducer.class));

    public static void main(String[] args) {
        Properties prop = new Properties();

        // Kafka服務端的主機名和埠號
        prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");
        // 等待所有副本節點的應答(最嚴格的資料儲存方式,效率也最低,還可以取值 0 或者 1)
        prop.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        // 訊息傳送失敗最大嘗試次數
        prop.put(ProducerConfig.RETRIES_CONFIG, 0);
        // 一批訊息處理大小
        prop.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        // 請求延時
        prop.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 傳送快取區記憶體大小
        prop.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // key序列化
        prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(prop);

        producer.send(new ProducerRecord<>("first", "curry"),((recordMetadata, e) -> {
            if (recordMetadata != null) {
                System.out.println("recordMetadata.topic() = " + recordMetadata.topic());
                System.out.println("recordMetadata.offset() = " + recordMetadata.offset());
                System.out.println("recordMetadata.partition() = " + recordMetadata.partition());
            }else {
                LOGGER.info("metadata is null!");
            }
        }));

        producer.close();
    }
}

④ 自定義分割槽生產者

0)需求:將所有資料儲存到topic的第0號分割槽上

1)定義一個類實現Partitione