1. 程式人生 > >Kafka文件(2)----0.8.2- 基本介紹(Getting Start)

Kafka文件(2)----0.8.2- 基本介紹(Getting Start)

來源:

說明:

原文中某些專有名詞不做翻譯:

kafka

topic

partition

consumer

producer

server

client

high-level

1、開始

1.1 介紹
kafka可提供分散式、分割槽的、可備份的日誌提交服務,同時也是設計比較獨特的訊息傳遞系統。

以上到底是什麼意思呢?

首先,我們來回顧一下訊息傳遞的基本術語:

-Kafka按照topics分類的方式維護訊息的資訊流(feed)

-producers用來發布訊息

-consumer訂閱topics,並處理topics下的資訊流(feed)

-以叢集方式執行,可以由一個或者多個稱為broker的伺服器組成

因此,在high  level上來看,producer通過網路向kafka叢集傳送訊息,同時,kafka叢集可以向consumers提供這些訊息,如圖所示:

                                                                        

clients和servers之間的通訊是通過簡單的、高效能的、語言無關的TCP 協議完成的;官方不僅提供了Java client,同時還提供了很多其他語言形式的client



Topics和Logs


首先深入瞭解一下kafka的高階抽象---topic
topic可以認為是訊息流的不同類別。不同的訊息根據就是通過不同的topic進行分類或者彙總,然後producer將不同分類的訊息發往不同的topic。對於每一個topic,kafka叢集維護一個分割槽的日誌:如圖所示:

                                                              

上圖中可以看出,每個partition中的訊息序列都是有序的,一旦寫入就不可更改,只能在partition末尾不停追加。同一partition中的不同訊息都會分配一個連續的數字進行標識,這個數字被稱為offset,代表了訊息的 寫入順序。

kafka叢集可以儲存所有釋出的訊息---無論訊息是否consumed----儲存時間是可配置的。例如,如果日誌儲存時間設定為兩天,則從日誌寫入kafka之時起,兩天之內都是可供消費的,然而兩天之後訊息會刪除以釋放空間。因此,Kafka可以高效持久的儲存大量資料。

事實上,每個消費者所需要儲存的元資料只有一個,即”offset“,即主要用來記錄日誌中當前consume的位置。offset是由consumer維護,而不是kafka叢集維護: 通常情況下,offset會隨著consumer閱讀訊息而線性的遞增,好似offset只能被動跟隨consumer閱讀變化,但實際上,offset完全是由consumer控制的,consumer可以從任何它喜歡的位置consum訊息。例如,consumer可以將offset重新設定為先前的值並重新consum資料。

這些特徵共同說明: Kafka consumers耗費資源比較少----在不影響叢集和其他consumers的情況下,consumers可以方便的反覆讀取。例如,可以使用kafka提供的命令列工具持續讀取任何topic的最新訊息,而不會影響到其他任何consumers 的消費行為。

日誌劃分為不同的partitions有以下目的: 第一,多個partition的儲存能力遠超過單個伺服器,但是每一個partition的儲存能力就是所在伺服器的儲存能力,即同一個topic的同一個partition的資料只能在同一臺server上儲存,也就是說同一個topic下的同一個partition的資料不能同時存放於兩臺server上,但是同一個topic可以包含很多partitions,理論上你可以通過增加server的數目來增加partitions的數目。第二,partitions機制提高了並行處理能力,而且提高的可不是一點半點(既可以有多個consumers對不同的partition進行consume,也可以有不同的consumers對同一個partition進行consume,因為offset是由consumer控制的)。

 Distribution-分散式

日誌的partitions分佈在Kafka叢集中的伺服器上,每個伺服器都可以處理資料以及共享partitions的請求。每個分割槽都可以進行備份,同時備份數目是可以配置的,以提高容錯能力。

每個partitions擁有一臺稱為“leader”的伺服器,同時擁有0個或者多個稱為“followers”的伺服器;“leader”處理所有針對此partition的讀寫請求,而“follower”只是對“leader"進行備份。如果”leader“失效,則”followers"當中的某一個會自動成為新“leader”。 每個伺服器上都儲存多個partitions,它既是這當中部分partitions的“leader”,同時又是剩餘partitions的“follower”,這樣可以很好均衡叢集負載

Producers-生產者

Producers選擇向某些topics釋出資料。producer負責將某個訊息分發到topic名下某個partition上去。出於負載均衡的考慮,既可以通過迴圈遍歷的方式分發訊息,也可以通過執行基於某些分發函式來分發訊息(即基於訊息中某些關鍵字)。這可不僅僅是使用秒級以內的partitoning。

Consumers-消費者

傳統訊息系統採用兩種模式: queuing   和    publish-subscribe。佇列模式中,眾多consumers可能從同一個server讀取訊息,而每條訊息被某個consumer所消耗。在釋出-訂閱模式下,每條訊息都可以被所有consumers消耗。Kafka提供一種單獨的consumer抽象,此抽象具有以上兩種模式的特徵----consumer  group模式。

佇列模式:

                                              

釋出-訂閱者模式:

                                           

kafka  consumer group 模式:

                                            

Consumers使用consumer  group名字標識它們自己,每個topic的每條訊息都會發送到所有consumer groups,但是隻能傳送到每個consumer groups的某個consumer例項。這些consumer例項既可以分佈在不同的程序中,也可以分佈在不同的機器上。

若是所有consumer例項屬於相同的consumer組,則這種模式就是傳統訊息佇列模式,在所有consumers之間進行負載均衡。 

若是所有consumer例項屬於不同的consumer組,則這種模式就是釋出-訂閱模式,所有訊息都會廣播到所有consumer。

通常來說,topics一般都會由少量的consumers組進行消費,每個consumer組都是topic邏輯上的消費者。每個consumer組都是有很多consumer例項組成,多個例項在擴充套件性和容錯上比較有優勢。相比釋出-訂閱訊息佇列模式,kafka只有一點不同,即kafka的訂閱者是由consumers組成的叢集而非單獨的程序。

相對於傳統的訊息系統,Kafka在訊息次序上有更強的保證。

                                     

傳統佇列模式在server上按順序儲存訊息,如果多個consumers從佇列中consum,則server按照訊息儲存順序發出訊息。 然而,即使server按照順序發出訊息,由於訊息是非同步發向consumers,則這些訊息可能並不是按照儲存順序到達consumers的(例如,server上訊息的存放順序是M1,M2,M3,M4,consumers是C1,C2,C3,C4,非同步傳送為:M1發向C2,M2發向C4,M3發向C1,M4發向C3,則訊息到達consumer的時間次序可能是,M4,M2,M1,M3,這樣的結果就是consumer處理訊息的順序與儲存順序不一致,就打亂了原始訊息次序)。這就意味著:訊息順序在並行consumption中丟失了。訊息傳遞系統的工作通常圍繞這個原則:consumer唯一,即一個佇列只允許有一個消費者,但這也就意味著失去了並行處理機制。

Kafka這方面就做的很好。 kafka提供並行處理機制--即partition---在topics內部,Kafka既可以提供訊息順序保證,也可以通過consumer程序池提供負載均衡。上述兩條是通過將將opics名下的partitions分配給consumer組內不同的consumers來實現的,這樣每個partition可以都是由consumer組裡某一個consumer消費。通過以上分配可以保證某個consumer會成為某個partition的確定consumer,這樣一來,consumer也就會按照資料儲存順序消費。多個partitions的存在可以實現眾多consumer例項之間的負載均衡。需要注意的是,同一個consumer組裡的consumer例項不能多於partitions。

kafka只能保證同一個partition內的訊息在總體上是有序的,但是不能保證同一個topic名下不同partitions之間的訊息在總體上也是有序的。對於大多數應用來說,通過key值將資料分發到不同的partitions,這種方式對保證每個partition訊息有序是非常有用的。然而,如果需要所有訊息都有序的話,就要求topic只能有一個partition,這也就意味著每個consumer組裡只能有一個consumer程序。

Guarantees-保證

在high-level上,Kafka給與了以下保證:

-發往特定topic partition的訊息將會按照發送順序進行追加。例如,如果M1和M2都是由相同的producer傳送,而M1傳送順序較早,則M1在partition中offset要小於M2,就是說M1在訊息日誌中出現較早。

-consumer例項消費訊息的順序和訊息在kafka中存放順序一致。

-若是topic 的引數--replication-factor 為N,即設定備份server為N,則即使N-1個備份server都失敗了,容錯性可以保證沒有丟失任何提交的日誌訊息。

更多有關guarantees的細節在文件設計部分給出。

1.2  Use  Cases

此處有一些Apache  Kafa廣泛使用的例子描述。如果需要這些方面的概述,請看  this blog  post

Messaging(訊息系統)

Kafka 可以替代相對傳統的訊息代理(broker)。使用訊息代理(brokers) 的原因有很多:解耦資料產生和資料消費模組,以及快取未處理的訊息等。相比大多數訊息系統,kafka優勢更加明顯:更加出色的吞吐量、內建的partitioning機制、備份機制、以及容錯機制,這都使得kafka成為大規模訊息處理應用的比較好的解決方案。

以以往使用經驗來看,訊息應用通常是低吞吐量,但同時又需要端與端之間比較低的延遲,以及較強的持久化保證,而這些要求Kafka都能滿足。

在這一領域中,Kafka與ActiveMQ或者RabbitMQ等傳統訊息傳遞系統相比毫不遜色。

Website  Activity  Tracking(網站活動跟蹤)

kakfa最初應用例項重建使用者行為跟蹤管道,以此實現一系列實時的釋出-訂閱資訊流。這就是說:網站活動(頁面檢視、搜尋、或者其他使用者採取的行為)釋出到中心topics名下(每一種活動型別對應一個topic)。這些資訊流(feed)消費可以使用的範圍包括:實時處理、實時監測、以及將資料載入到Hadoop或者載入到離線資料倉庫系統,用來做離線處理或者展示。

大量的使用者檢視頁面的行為將產生大量檢視頁面的訊息。

Metrics(指標監測)

Kafka通常用來監控操作型別資料。這就涉及到聚合來自分散式應用的統計資料,以產生操作型別資料的彙總資訊流(feed)。

 Log  Aggregation(日誌聚合)

很多人將kafka作為日誌聚合解決方案。日誌聚合通常收集離線日誌檔案,然後將這些檔案集中起來(可能類似於HDFS的檔案伺服器)進行處理。kafka抽離檔案細節資訊,不在以檔案形式給出,而是給出更加清晰的抽象,將這些行為日誌或者事件資料以資訊流的形式給出。這就從流程上支援了更低的延時,同時更容易支援多個數據來源以及分散式的資料消耗。相比日誌集中收集系統(如Scribe或者Flume),kafka既可以提供相同功能,又可以提供由備份機制帶來的更強的持久化保證,以及端與端之間更低的延時。           

 Stream  Processing(流式處理)

很多使用者將kafka用作多級資料處理之間的訊息管道:原始資料存放於Kafka不同的topics中,然後經過聚合、增強、或者其他的轉換之後,匯入Kafka新的topics中,以供後面的消費。例如,對於新聞推薦的處理流程來所:首先從RSS資訊流中獲取文章內容,然後匯入名為“articles”的topic;其次,後面的處理可能是對這些內容進行規範化或者精簡操作,然後將經過上述處理的內容匯入新的topic;最後的處理可能是試圖將這些內容推薦給使用者。這樣的處理流程實際展現了實時流在獨立的topics之間流動的流程圖。從0.10.0.0開始,Apache Kafka推出了一款稱為Kafka Streams的流式處理庫,優點是輕量級同時效能很好,它可以完成上面所描述的多級處理。除了Kafka streams之外,還有一些開源流式處理工具可以選用,包括Apache  StormSamza

Event  Sourcing(事件收集)

Event  sourcing應用設計模式是:狀態改變記錄在一系列以時間為順序的日誌中。Kafka對大規模資料儲存的支援使它成為有效的後臺處理方式,用於處理事件收集。

Commit  Log(提交日誌)

Kafka可以用於分散式系統完成提交日誌的功能。Kafka通過節點間資料備份提高備份可靠性,同時採用重同步機制來保證失效節點能夠重新存放本應存放的資料。日誌壓縮機制(log compaction)有助於實現這種應用;在這種應用中,Kafka類似於Apache  BookKeeper 專案。

1.3   Quick  Start

本指導假設你剛開始使用kafka或者還沒有安裝Kafka或者ZooKeeper。

Step  1: Download  the  code

下載 0.10.0.0 釋出版本,然後解壓縮:

 > tar -xzf kafka_2.11-0.10.0.0.tgz
 > cd kafka_2.11-0.10.0.0    

 Step 2: Start  the server

Kafka使用 ZooKeeper,因此你需要首先啟動ZooKeeper server。你可以方便的使用指令碼來啟動。

 > bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

 現在啟動Kafka 伺服器:

> bin/kafka-server-start.sh config/server.properties
 [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
 [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

Step 3: 建立 topic

我們可以建立一個topic,命名為“test”,並且只有一個partition,也只有一個備份。

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

現在可以看一下topic, 使用命令列:

> bin/kafka-topics.sh --list --zookeeper localhost:2181

   test

 同樣,你可以選擇配置你的brokers,當向一個不存在的topic傳送訊息時,使brokers可以自動建立相應的topics,而不是手動去建立。

 Step4:傳送一些訊息

 Kafka擁有命令列客戶端, 既可以從檔案獲得輸入,也可以從標準輸入獲得,然後把這些輸入作為訊息傳送給Kafka叢集。 預設的設定是,每一行作為一個單獨的訊息傳送出去

執行Producer, 然後向控制端輸入一些訊息併發送到伺服器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Step 5: 啟動一個Consumer

kafka 也擁有一個命令列的consumer,這就可以將訊息輸出到標準輸出。

 > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

如果你在不同的終端執行上述命令,你可以在producer 終端輸入訊息,然後在consumer終端會看到這些訊息。

所有命令列工具都有其他的選項;執行命令時不要輸入任何引數,將會輸出更多的選項細節資訊

Step 6:設定多broker的叢集

目前為止,我們上述的測試都是在單broker環境下測試的,但是這還不夠有意思。對Kafka來說,單節點broker是含有一個節點的叢集,因此,除非你啟動更多broker例項,不然沒有任何事情發生變化。 但是, 就是為了感受這些變化,我們將我們的叢集擴充套件為3節點(不用擔心機器不夠,這3個節點都是部署在同一個機器上)。

 首先,我們需要為每個broker建立配置檔案:實現方式就是拷貝幾份  config/server.properties,然後根據具體需求進行配置:

> cp config/server.properties config/server-1.properties 
> cp config/server.properties config/server-2.properties

 現在可以編輯新配置檔案,並按照下述進行設定:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1


config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker.id 屬性是唯一的,是叢集中每個節點的永久性名字。 我們需要更改埠以及日誌目錄,只是因為我們在同一臺機器上執行所有節點, 我們希望儘量避免所有brokers都註冊相同的埠或者覆蓋各自的資料。

 我們前面步驟已使ZooKeeper 以及單節點已經啟動,因此我們當前就只需要啟動兩個新節點:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
 ...

現在建立一個新topic,並且設定 replication factore為3

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

 好了,現在我們已有叢集了,但是我們怎麼知道是哪個brokers在執行。 執行“describe  topics"命令,來看一下:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topicPartitionCount:1    ReplicationFactor:3    Configs:
      Topic: my-replicated-topic    Partition: 0    Leader: 1   Replicas: 1,2,0    Isr: 1,2,0

此處解釋一下輸出。第一行是所有partitions的總數,下面每一行都表示一個partition的資訊。因為當前topic只有一個partition,因此只有一行。

 “leader”  是負責針對給定partition讀寫操作的節點。每個節點都將成為partitions隨機選擇的leader。

 “replicas” 是給定partition備份節點的列表,無論這些備份節點是否leader,或者無論他們是否還活躍。

 “isr”   是同步的備份列表。這是備份列表的子集,即當前還活躍並且可以聯絡被leader連線到的備份節點。

注意: 我的例子中  1是topic中唯一一個partition的leader。

我們可以執行相同的命令列,用來檢視我們最初建立的topic的資訊:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:testPartitionCount:1          ReplicationFactor:1   Configs:
         Topic: test   Partition: 0   Leader: 0           Replicas: 0          Isr: 0

毫無疑問,最初的topic沒有副本備份,並且處於server 0上,就是當我們建立這個server時,我們的叢集中僅有的server。

讓我們向新topic傳送一些訊息:

> bin/kafka-console-producer.sh  --broker-list  localhost:9092   --topic  my-replicated-topic 

...
my test message 1
my test message 2
 ^C

現在,可以消費這些訊息

> bin/kafka-console-consumer.sh   --zookeeper  localhost:2181   --from-beginning  --topic  my-replicated-topic

 ...

my test message 1

my test message 2

^C

 現在可以測試kafka 的容錯效能,broker 1一直充當leader,因此我們可以殺掉他:

  > ps | grep server-1.properties

 7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
> kill -9 7564

leadership 轉向兩個被領導者之一,同時node 1不再同步。

備份設定:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topicPartitionCount:1ReplicationFactor:3Configs:
Topic: my-replicated-topicPartition: 0Leader: 2Replicas: 1,2,0Isr: 2,0

 但是訊息依然可以使用,即使最初執行寫入的leader已經殺掉了。

> bin/kafka-console-consumer.sh  --zookeeper  localhost:2181   --from-beginning  --topic  my-replicated-topic

...

my test message 1

my test message 2

^C

Step 7:使用Kafka Connect匯入/匯出資料

剛開始使用kafka時,使用終端來寫入資料或者將資料寫回終端是很方便的測試方式,但是在以後的使用中,你可能向從其他來源將資料匯入kafka或者將資料從kafka匯出到其他系統。對於很多系統來說,不必開發相關客戶端繼承程式碼,就可以使用Kafka Connect工具匯入或者匯出資料。Kafka Connect是Kafka內建工具,專門用來匯入或者匯出資料,而且這款工具是可擴充套件的,他使用connectors在客戶端邏輯上實現了與外部系統的互聯。在QuickStart中,可以看到Kafka Connect是如何使用簡單的connectors,就可以從檔案匯入資料到kafka topic以及將Kafka topic中的資料匯出到一個檔案。首先,建立一些包含資訊流的檔案用來測試:

> echo -e "foo\nbar" > test.txt
下面,我們將啟動兩個以單例模式執行的connectors,這就意味著它們執行在一個單獨的、本地的、專門的程序中。我們提供三個配置檔案作為輸入引數。第一個是Kafka Connect程序的配置,主要是通用配置,例如連線的Kafka brokers以及資料序列化格式。剩餘的配置檔案每一個都用來建立的connector。這些配置檔案包括一個獨一無二的connector名字,例項化的connector類,以及connector所要求的其他配置。
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
這些簡單的配置檔案,是kafka內建的,使用預設的本地叢集配置,建立兩個connectors:第一個是源connector,從輸入檔案中讀取一行一行的資料,然後將這些資料傳送到kafka的一個topic中;第二個是目的connector,從kafka的toic中讀取訊息,然後將每條訊息作為一行寫入輸出檔案。在啟動期間,你將看到大量的日誌資訊,一些日誌實際上指明瞭connectors正在例項化的過程。一旦Kafka Connect程序啟動,源connector應當開始從以下檔案按行讀取資料:

test.txt

同時將這些資料發往topic:

connect-test

,同時目的connector應當開始從topic讀取訊息:

connect-test

,並將訊息寫入檔案:

test.sink.txt

中,我們可以通過檢查這個檔案內容來確認每一行資料是否完整的傳送:

> cat test.sink.txt
foo
bar

注意,資料存放在Kafka topic

connect-test

,因此,可以執行終端consumer工具指令碼來檢視topic中的資料(或者通過客戶端consumer程式碼來驗證):

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
connectors會持續不斷的處理資料,因此我們可以向test.txt中新增資料,可以看到這些資料通過kafka這個管道:
> echo "Another line" >> test.txt
你應當可以這行資料會在消費者終端輸出,同時也會出現在test.sink.txt中

Step 8:Use Kafka Streams to process data

Kafka Streams是Kafka中用於客戶端的庫,主要用於獲取實時流處理以及分析Kafka brokers中儲存的資料。這個例子將會展示如何使用這個庫來執行一個流式處理應用。這裡有一個WordCountDemo的主要程式碼(轉換成Java8 lambda表示式更易讀):

KTable wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Ensure the words are available as record keys for the next aggregate operation.
    .map((key, value) -> new KeyValue<>(value, value))

    // Count the occurrences of each word (record key) and store the results into a table named "Counts".
    .countByKey("Counts")

這個例子實現了WordCount演算法,這個演算法計算了輸入文字中的詞頻。然而,這個例子並不像你以前看到的那些例子一樣,都是計算固定大小的資料,這個WordCount demo應用稍微有點不同,它是基於不會終止的資料流計算的。和計算固定資料的模型比較形似的是,它也會不停的更新詞頻計算結果。然而,由於它是基於永不停止的資料流,所以會週期性的輸出當前的計算結果,他會不停的處理更多的資料,因為它也不知道資料流什麼時候終止。

現在可以將輸入資料匯入Kafka topic,這些資料會由Kafka Streams應用處理:

> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

接著,我們將這些資料資料傳送到名為streams-file-input的topic,這步可以使用終端producer來完成(實際上,流資料會持續不斷的流入kafka,然後應用會啟動然後開始執行):

> bin/kafka-topics.sh --create \
            --zookeeper localhost:2181 \
            --replication-factor 1 \
            --partitions 1 \
            --topic streams-file-input
> cat file-input.txt | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input

下面,可以執行WordCount demo應用來處理輸入資料:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

並不會有標準輸出,結果會持續不斷的寫回kafka中另一個名為streams-wordcount-output的topic。這個demo將會執行數秒,之後不會像典型的流處理應用,它會自動終止。

現在可以檢查一下WordCount demo應用的輸出:

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
            --topic streams-wordcount-output \
            --from-beginning \
            --formatter kafka.tools.DefaultMessageFormatter \
            --property print.key=true \
            --property print.value=true \
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

終端會打印出以下資料:

all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2
join    1
kafka   3
summit  1

第一列是Kafka訊息的key,第二列是訊息value,兩者都是java.lang.String格式。注意,輸出實際上應該是持續的更新資料流,資料流中的每一個記錄(例如,上面輸出的每一行)都是一個單獨詞彙的數量,或者是記錄了key的數量,例如上面的“kafka"。對於多條記錄的key一致這種情況,每一條後面的記錄都是對前一條記錄的更新。

現在你可以寫入更多的訊息到streams-file-input這個topic,可以觀察到更多的訊息會發送到streams-wordcount-output這個topic,反映了更新之後的詞彙數量。

你可以使用Ctrl+C結束上述consumer。

1.4   Ecosystem

除主要的分散式功能之外,Kafka 可以和很多工具整合在一起。                 

 ecosystem page列出很多可以整合的工具,包含流式處理系統,Hadoop 整合、檢測以及部署工具。

1.5  Upgrading  From Previous  Versions

0.10.0.0 has potential breaking changes (please review before upgrading) and possible performance impact following the upgrade. By following the recommended rolling upgrade plan below, you guarantee no downtime and no performance impact during and following the upgrade. 
Note: Because new protocols are introduced, it is important to upgrade your Kafka clusters before upgrading your clients.

Notes to clients with version 0.9.0.0: Due to a bug introduced in 0.9.0.0, clients that depend on ZooKeeper (old Scala high-level Consumer and MirrorMaker if used with the old consumer) will not work with 0.10.0.x brokers. Therefore, 0.9.0.0 clients should be upgraded to 0.9.0.1 before brokers are upgraded to 0.10.0.x. This step is not necessary for 0.8.X or 0.9.0.1 clients.

For a rolling upgrade:

  1. Update server.properties file on all brokers and add the following properties:
  2. Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it.
  3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.0.0. NOTE: You shouldn't touch log.message.format.version yet - this parameter should only change once all consumers have been upgraded to 0.10.0.0
  4. Restart the brokers one by one for the new protocol version to take effect.
  5. Once all consumers have been upgraded to 0.10.0, change log.message.format.version to 0.10.0 on each broker and restart them one by one.

Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.

Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.

The message format in 0.10.0 includes a new timestamp field and uses relative offsets for compressed messages. The on disk message format can be configured through log.message.format.version in the server.properties file. The default on-disk message format is 0.10.0. If a consumer client is on a version before 0.10.0.0, it only understands message formats before 0.10.0. In this case, the broker is able to convert messages from the 0.10.0 format to an earlier format before sending the response to the consumer on an older version. However, the broker can't use zero-copy transfer in this case. Reports from the Kafka community on the performance impact have shown CPU utilization going from 20% before to 100% after an upgrade, which forced an immediate upgrade of all clients to bring performance back to normal. To avoid such message conversion before consumers are upgraded to 0.10.0.0, one can set log.message.format.version to 0.8.2 or 0.9.0 when upgrading the broker to 0.10.0.0. This way, the broker can still use zero-copy transfer to send the data to the old consumers. Once consumers are upgraded, one can change the message format to 0.10.0 on the broker and enjoy the new message format that includes new timestamp and improved compression. The conversion is supported to ensure compatibility and can be useful to support a few apps that have not updated to newer clients yet, but is impractical to support all consumer traffic on even an overprovisioned cluster. Therefore it is critical to avoid the message conversion as much as possible when brokers have been upgraded but the majority of clients have not.

For clients that are upgraded to 0.10.0.0, there is no performance impact.

Note: By setting the message format version, one certifies that all existing messages are on or below that message format version. Otherwise consumers before 0.10.0.0 might break. In particular, after the message format is set to 0.10.0, one should not change it back to an earlier format as it may break consumers on versions before 0.10.0.0.

Note: Due to the additional timestamp introduced in each message, producers sending small messages may see a message throughput degradation because of the increased overhead. Likewise, replication now transmits an additional 8 bytes per message. If you're running close to the network capacity of your cluster, it's possible that you'll overwhelm the network cards and see failures and performance issues due to the overload.

Note: If you have enabled compression on producers, you may notice reduced producer throughput and/or lower compression rate on the broker in some cases. When receiving compressed messages, 0.10.0 brokers avoid recompressing the messages, which in general reduces the latency and improves the throughput. In certain cases, however, this may reduce the batching size on the producer, which could lead to worse throughput. If this happens, users can tune linger.ms and batch.size of the producer for better throughput. In addition, the producer buffer used for compressing messages with snappy is smaller than the one used by the broker, which may have a negative impact on the compression ratio for the messages on disk. We intend to make this configurable in a future Kafka release.
  • Starting from Kafka 0.10.0.0, the message format version in Kafka is represented as the Kafka version. For example, message format 0.9.0 refers to the highest message version supported by Kafka 0.9.0.
  • Message format 0.10.0 has been introduced and it is used by default. It includes a timestamp field in the messages and relative offsets are used for compressed messages.
  • ProduceRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0
  • FetchRequest/Response v2 has been introduced and it is used by default to support message format 0.10.0
  • MessageFormatter interface was changed from def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) to def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
  • MessageReader interface was changed from def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] to def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
  • MessageFormatter's package was changed from kafka.tools to kafka.common
  • MessageReader's package was changed from kafka.tools to kafka.common
  • MirrorMakerMessageHandler no longer exposes the handle(record: MessageAndMetadata[Array[Byte], Array[Byte]]) method as it was never called.
  • The 0.7 KafkaMigrationTool is no longer packaged with Kafka. If you need to migrate from 0.7 to 0.10.0, please migrate to 0.8 first and then follow the documented upgrade process to upgrade from 0.8 to 0.10.0.
  • The new consumer has standardized its APIs to accept java.util.Collection as the sequence type for method parameters. Existing code may have to be updated to work with the 0.10.0 client library.
  • LZ4-compressed message handling was changed to use an interoperable framing specification (LZ4f v1.5.1). To maintain compatibility with old clients, this change only applies to Message format 0.10.0 and later. Clients that Produce/Fetch LZ4-compressed messages using v0/v1 (Message format 0.9.0) should continue to use the 0.9.0 framing implementation. Clients that use Produce/Fetch protocols v2 or later should use interoperable LZ4f framing. A list of interoperable LZ4 libraries is available at http://www.lz4.org/
  • Starting from Kafka 0.10.0.0, a new client library named Kafka Streams is available for stream processing on data stored in Kafka topics. This new client library only works with 0.10.x and upward versioned brokers due to message format changes mentioned above. For more information please read this section.
  • The default value of the configuration parameter receive.buffer.bytes is now 64K for the new consumer.
  • The new consumer now exposes the configuration parameter exclude.internal.topics to restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. By default, it is enabled.
  • The old Scala producer has been deprecated. Users should migrate their code to the Java producer included in the kafka-clients JAR as soon as possible.
  • The new consumer API has been marked stable.
0.9.0.0 has potential breaking changes (please review before upgrading) and an inter-broker protocol change from previous versions. This means that upgraded brokers and clients may not be compatible with older versions. It is important that you upgrade your Kafka cluster before upgrading your clients. If you are using MirrorMaker downstream clusters should be upgraded first as well.

For a rolling upgrade:

  1. Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=0.8.2.X
  2. Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it.
  3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.9.0.0.
  4. Restart the brokers one by one for the new protocol version to take effect

Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.

Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.

  • Java 1.6 is no longer supported.
  • Scala 2.9 is no longer supported.
  • Broker IDs above 1000 are now reserved by default to automatically assigned broker IDs. If your cluster has existing broker IDs above that threshold make sure to increase the reserved.broker.max.id broker configuration property accordingly.
  • Configuration parameter replica.lag.max.messages was removed. Partition leaders will no longer consider the number of lagging messages when deciding which replicas are in sync.
  • Configuration parameter replica.lag.time.max.ms now refers not just to the time passed since last fetch request from replica, but also to time since the replica last caught up. Replicas that are still fetching messages from leaders but did not catch up to the latest messages in replica.lag.time.max.ms will be considered out of sync.
  • Compacted topics no longer accept messages without key and an exception is thrown by the producer if this is attempted. In 0.8.x, a message without key would cause the log compaction thread to subsequently complain and quit (and stop compacting all compacted topics).
  • MirrorMaker no longer supports multiple target clusters. As a result it will only accept a single --consumer.config parameter. To mirror multiple source clusters, you will need at least one MirrorMaker instance per source cluster, each with its own consumer configuration.
  • Tools packaged under org.apache.kafka.clients.tools.* have been moved to org.apache.kafka.tools.*. All included scripts will still function as usual, only custom code directly importing these classes will be affected.
  • The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh.
  • The kafka-topics.sh script (kafka.admin.TopicCommand) now exits with non-zero exit code on failure.
  • The kafka-topics.sh script (kafka.admin.TopicCommand) will now print a warning when topic names risk metric collisions due to the use of a '.' or '_' in the topic name, and error in the case of an actual collision.
  • The kafka-console-producer.sh script (kafka.tools.ConsoleProducer) will use the new producer instead of the old producer be default, and users have to specify 'old-producer' to use the old producer.
  • By default all command line tools will print all logging messages to stderr instead of stdout.
  • The new broker id generation feature can be disabled by setting broker.id.generation.enable to false.
  • Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics.
  • Default value of configuration parameter fetch.min.bytes for the new consumer is now 1 by default.
Deprecations in 0.9.0.0
  • Altering topic configuration from the kafka-topics.sh script (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality.
  • The kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this functionality.
  • The kafka.tools.ProducerPerformance class has been deprecated. Going forward, please use org.apache.kafka.tools.ProducerPerformance for this functionality (kafka-producer-perf-test.sh will also be changed to use the new class).
  • The producer config block.on.buffer.full has been deprecated and will be removed in future release. Currently its default value has been changed to false. The KafkaProducer will no longer throw BufferExhaustedException but instead will use max.block.ms value to block, after which it will throw a TimeoutException. If block.on.buffer.full property is set to true explicitly, it will set the max.block.ms to Long.MAX_VALUE and metadata.fetch.timeout.ms will not be honoured
0.8.2 is fully compatible with 0.8.1. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it. 0.8.1 is fully compatible with 0.8. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it. Release 0.7 is incompatible with newer releases. Major changes were made to the API, ZooKeeper data structures, and protocol, and configuration in order to add replication (Which was missing in 0.7). The upgrade from 0.7 to later versions requires a special tool for migration. This migration can be done without downtime.