1. 程式人生 > >訊息中介軟體kafka(0.9以及0.10版本)學習及實踐

訊息中介軟體kafka(0.9以及0.10版本)學習及實踐

目錄 一、介紹 二、特點 三、何時需要訊息佇列 四、元件 五、訊息傳送的流程 六、原理 七、實踐 八、版本比較 檢視文章連結 中介軟體交流群:461333361 一、介紹 kafka,A distributed publish-subscribe messaging system,訊息中介軟體利用高效可靠的訊息傳遞機制進行平臺無關的資料交流,並基於資料通訊來進行分散式系統的整合。通過提供訊息傳遞和訊息排隊模型,它可以在分散式環境下擴充套件程序間的通訊。
kafka是LinkedIn開源出來的一款訊息中介軟體,用scala語言,在LinkedIn的業務場景中用於收集日誌資訊,典型的pub-sub中的pull模型。一般的JMS都是push模型,如老牌的RabbitMQ,ActiveMQ等。pull模型的還有RocketMQ,用java語言基於kafka原理改寫的,一種優化的做法----長輪詢pull模型。
二、特點
MQ生來就是解決生產者和消費者速度不匹配的問題而誕生的,那麼MQ系統一個最最基本的要求就是寫入速度必須要快,哪怕出隊速度慢點也無所謂,因為業務高峰期持續時間是有限的,高峰結束之後有的是時間讓消費者慢慢消化,更別說簡單粗暴多加幾臺消費者就好了。 1.同時為釋出和訂閱提供高吞吐量。據瞭解,Kafka每秒可以生產約25萬訊息(50 MB),每秒處理55萬訊息(110 MB)。 2.可進行持久化操作。將訊息持久化到磁碟,因此可用於批量消費,例如ETL,以及實時應用程式。通過將資料持久化到硬碟以及replication防止資料丟失。 3.分散式系統,易於向外擴充套件。所有的producer、broker和consumer都會有多個,均為分散式的。無需停機即可擴充套件機器。 4.訊息被處理的狀態是在consumer端維護,而不是由server端維護。當失敗時能自動平衡。 5.支援online和offline的場景。
三、何時需要訊息佇列
MQ訊息佇列是應運鬆偶合的概念而產生的,主要以佇列和釋出訂閱為訊息傳輸機制,以非同步的方式將訊息可靠的傳輸到消費端的一種基礎產品。 它被廣泛的應用與跨平臺、跨系統的分散式系統之間,為它們提供高效可靠的非同步傳輸機制。 可以使用mq的場景有很多,最常用的幾種,是做業務解耦/最終一致性/廣播/錯峰流控等。反之,如果需要強一致性,關注業務邏輯的處理結果,則RPC顯得更為合適。
四、元件
broker:釋出和訂閱的中間者,堆積訊息。 zookeeper:管理kafka叢集,監控每個broker的狀態,使得釋出和訂閱都與有效的broker進行。 producer:訊息的釋出者。 Producer將訊息釋出到指定的Topic中,同時Producer也能決定將此訊息傳送到哪個partition。
consumer:非同步消費者,通過訂閱感興趣的topic,從broker拉取訊息進行消費。在kafka中,一個partition中的訊息只會被group中的一個consumer消費(同一時刻);每個group中consumer訊息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以同時消費多個partitions中的訊息.kafka只能保證一個partition中的訊息被某個consumer消費時是順序的.事實上,從Topic角度來說,當有多個partitions時,訊息仍不是全域性有序的. Consumer group: 每個consumer客戶端被建立時,會向zookeeper註冊自己的資訊;此作用主要是為了"負載均衡".一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了效能考慮,讓partition相對均衡的分散到每個consumer上. topic:將訊息劃分成話題,作為某一型別的佇列。 Partition:
parition是物理上的概念,每個topic包含一個或多個partition,建立topic時可指定parition數量。每個partition對應於一個資料夾,該資料夾下儲存該partition的資料和索引檔案。topic進行劃分為多個分割槽,分割槽可以自己寫函式指定分配訊息到某一分割槽,預設的如hash,round-robin方式,Kafka儘量的使所有分割槽均勻的分佈到叢集所有的節點上而不是集中在某些節點上,另外主從關係也儘量均衡這樣每個幾點都會擔任一定比例的分割槽的leader.。 每個partition是一個有序的佇列,每條訊息擁有一個offset。
五、訊息傳送的流程
1.Producer根據指定的partition方法(round-robin、hash等),將訊息釋出到指定topic的partition裡面 2.kafka叢集接收到Producer發過來的訊息後,將其持久化到硬碟,並保留訊息指定時長(可配置),而不關注訊息是否被消費。 3.Consumer從kafka叢集pull資料,並控制獲取訊息的offset 六、原理
1.持久化 kafka使用檔案儲存訊息(append only log),這就直接決定kafka在效能上嚴重依賴檔案系統的本身特性.且無論任何OS下,對檔案系統本身的優化是非常艱難的.檔案快取/直接記憶體對映等是常用的手段.因為kafka是對日誌檔案進行append操作,因此磁碟檢索的開支是較小的;同時為了減少磁碟寫入的次數,broker會將訊息暫時buffer起來,當訊息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO呼叫的次數.對於kafka而言,較高效能的磁碟,將會帶來更加直接的效能提升.
2.效能 除磁碟IO之外,我們還需要考慮網路IO,這直接關係到kafka的吞吐量問題.kafka並沒有提供太多高超的技巧;對於producer端,可以將訊息buffer起來,當訊息的條數達到一定閥值時,批量傳送給broker;對於consumer端也是一樣,批量fetch多條訊息.不過訊息量的大小可以通過配置檔案來指定.對於kafka broker端,似乎有個sendfile系統呼叫可以潛在的提升網路IO的效能:將檔案的資料對映到系統記憶體中,socket直接讀取相應的記憶體區域即可,而無需程序再次copy和交換(這裡涉及到"磁碟IO資料"/"核心記憶體"/"程序記憶體"/"網路緩衝區",多者之間的資料copy). 其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用訊息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網路IO更應該需要考慮.可以將任何在網路上傳輸的訊息都經過壓縮.kafka支援gzip/snappy等多種壓縮方式.
3.負載均衡 kafka叢集中的任何一個broker,都可以向producer提供metadata資訊,這些metadata中包含"叢集中存活的servers列表"/"partitions leader列表"等資訊(請參看zookeeper中的節點資訊). 當producer獲取到metadata資訊之後, producer將會和Topic下所有partition leader保持socket連線;訊息由producer直接通過socket傳送到broker,中間不會經過任何"路由層". 非同步傳送,將多條訊息暫且在客戶端buffer起來,並將他們批量傳送到broker;小資料IO太多,會拖慢整體的網路延遲,批量延遲傳送事實上提升了網路效率;不過這也有一定的隱患,比如當producer失效時,那些尚未傳送的訊息將會丟失。
4.Topic模型 其他JMS實現,訊息消費的位置是有prodiver保留,以便避免重複傳送訊息或者將沒有消費成功的訊息重發等,同時還要控制訊息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的訊息只有一個consumer在消費,且不存在訊息狀態的控制,也沒有複雜的訊息確認機制,可見kafka broker端是相當輕量級的.當訊息被consumer接收之後,consumer可以在本地儲存最後訊息的offset,並間歇性的向zookeeper註冊offset.由此可見,consumer客戶端也很輕量級。 kafka中consumer負責維護訊息的消費記錄,而broker則不關心這些,這種設計不僅提高了consumer端的靈活性,也適度的減輕了broker端設計的複雜度;這是和眾多JMS prodiver的區別.此外,kafka中訊息ACK的設計也和JMS有很大不同,kafka中的訊息是批量(通常以訊息的條數或者chunk的尺寸為單位)傳送給consumer,當訊息消費成功後,向zookeeper提交訊息的offset,而不會向broker交付ACK.或許你已經意識到,這種"寬鬆"的設計,將會有"丟失"訊息/"訊息重發"的危險.
5.訊息傳輸一致 Kafka提供3種訊息傳輸一致性語義:最多1次,最少1次,恰好1次。 最少1次:可能會重傳資料,有可能出現數據被重複處理的情況; 最多1次:可能會出現資料丟失情況; 恰好1次:並不是指真正只傳輸1次,只不過有一個機制。確保不會出現“資料被重複處理”和“資料丟失”的情況。
at most once: 消費者fetch訊息,然後儲存offset,然後處理訊息;當client儲存offset之後,但是在訊息處理過程中consumer程序失效(crash),導致部分訊息未能繼續處理.那麼此後可能其他consumer會接管,但是因為offset已經提前儲存,那麼新的consumer將不能fetch到offset之前的訊息(儘管它們尚沒有被處理),這就是"at most once". at least once: 消費者fetch訊息,然後處理訊息,然後儲存offset.如果訊息處理成功之後,但是在儲存offset階段zookeeper異常或者consumer失效,導致儲存offset操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的訊息,這就是"at least once". "Kafka Cluster"到消費者的場景中可以採取以下方案來得到“恰好1次”的一致性語義: 最少1次+消費者的輸出中額外增加已處理訊息最大編號:由於已處理訊息最大編號的存在,不會出現重複處理訊息的情況。
6.副本 kafka中,replication策略是基於partition,而不是topic;kafka將每個partition資料複製到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置檔案來設定。leader處理所有的read-write請求,follower需要和leader保持同步.Follower就像一個"consumer",消費訊息並儲存在本地日誌中;leader負責跟蹤所有的follower狀態,如果follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.當所有的follower都將一條訊息儲存成功,此訊息才被認為是"committed",那麼此時consumer才能消費它,這種同步策略,就要求follower和leader之間必須具有良好的網路環境.即使只有一個replicas例項存活,仍然可以保證訊息的正常傳送和接收,只要zookeeper叢集存活即可. 選擇follower時需要兼顧一個問題,就是新leader server上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負載均衡",partition leader較少的broker將會更有可能成為新的leader.
7.log 每個log entry格式為"4個位元組的數字N表示訊息的長度" + "N個位元組的訊息內容";每個日誌都有一個offset來唯一的標記一條訊息,offset的值為8個位元組的數字,表示此訊息在此partition中所處的起始位置..每個partition在物理儲存層面,有多個log file組成(稱為segment).segment file的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始訊息的offset.
獲取訊息時,需要指定offset和最大chunk尺寸,offset用來表示訊息的起始位置,chunk size用來表示最大獲取訊息的總長度(間接的表示訊息的條數).根據offset,可以找到此訊息所在segment檔案,然後根據segment的最小offset取差值,得到它在file中的相對位置,直接讀取輸出即可. 7.分散式 kafka使用zookeeper來儲存一些meta資訊,並使用了zookeeper watch機制來發現meta資訊的變更並作出相應的動作(比如consumer失效,觸發負載均衡等) Broker node registry: 當一個kafka broker啟動後,首先會向zookeeper註冊自己的節點資訊(臨時znode),同時當broker和zookeeper斷開連線時,此znode也會被刪除. Broker Topic Registry: 當一個broker啟動時,會向zookeeper註冊自己持有的topic和partitions資訊,仍然是一個臨時znode. Consumer and Consumer group: 每個consumer客戶端被建立時,會向zookeeper註冊自己的資訊;此作用主要是為了"負載均衡".一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了效能考慮,讓partition相對均衡的分散到每個consumer上. Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置檔案指定,也可以由系統生成),此id用來標記消費者資訊. Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費. Partition Owner registry: 用來標記partition正在被哪個consumer消費.臨時znode。此節點表達了"一個partition"只能被group下一個consumer消費,同時當group下某個consumer失效,那麼將會觸發負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"遊離"的partitions) 當consumer啟動時,所觸發的操作: A) 首先進行"Consumer id Registry"; B) 然後在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那麼其他consumer接管partitions). C) 在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.
總結: 1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連線併發送訊息. 2) Broker端使用zookeeper用來註冊broker資訊,已經監測partition leader存活性. 3) Consumer端使用zookeeper用來註冊consumer資訊,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連線,並獲取訊息。
8.Leader的選擇 Kafka的核心是日誌檔案,日誌檔案在叢集中的同步是分散式資料系統最基礎的要素。 如果leaders永遠不會down的話我們就不需要followers了!一旦leader down掉了,需要在followers中選擇一個新的leader.但是followers本身有可能延時太久或者crash,所以必須選擇高質量的follower作為leader.必須保證,一旦一個訊息被提交了,但是leader down掉了,新選出的leader必須可以提供這條訊息。大部分的分散式系統採用了多數投票法則選擇新的leader,對於多數投票法則,就是根據所有副本節點的狀況動態的選擇最適合的作為leader.Kafka並不是使用這種方法。 Kafka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條訊息必須被這個集合中的每個節點讀取並追加到日誌中了,才回通知外部這個訊息已經被提交了。因此這個集合中的任何一個節點隨時都可以被選為leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就可以允許在f個節點down掉的情況下不會丟失訊息並正常提供服。ISR的成員是動態的,如果一個節點被淘汰了,當它重新達到“同步中”的狀態時,他可以重新加入ISR.這種leader的選擇方式是非常快速的,適合kafka的應用場景。 一個邪惡的想法:如果所有節點都down掉了怎麼辦?Kafka對於資料不會丟失的保證,是基於至少一個節點是存活的,一旦所有節點都down了,這個就不能保證了。 實際應用中,當所有的副本都down掉時,必須及時作出反應。可以有以下兩種選擇: 1. 等待ISR中的任何一個節點恢復並擔任leader。 2. 選擇所有節點中(不只是ISR)第一個恢復的節點作為leader. 這是一個在可用性和連續性之間的權衡。如果等待ISR中的節點恢復,一旦ISR中的節點起不起來或者資料都是了,那叢集就永遠恢復不了了。如果等待ISR意外的節點恢復,這個節點的資料就會被作為線上資料,有可能和真實的資料有所出入,因為有些資料它可能還沒同步到。Kafka目前選擇了第二種策略,在未來的版本中將使這個策略的選擇可配置,可以根據場景靈活的選擇。 這種窘境不只Kafka會遇到,幾乎所有的分散式資料系統都會遇到。
9.副本管理 以上僅僅以一個topic一個分割槽為例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分割槽.Kafka儘量的使所有分割槽均勻的分佈到叢集所有的節點上而不是集中在某些節點上,另外主從關係也儘量均衡這樣每個幾點都會擔任一定比例的分割槽的leader. 優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點作為“controller”,當發現有節點down掉的時候它負責在游泳分割槽的所有節點中選擇新的leader,這使得Kafka可以批量的高效的管理所有分割槽節點的主從關係。如果controller down掉了,活著的節點中的一個會備切換為新的controller.
10.Leader與副本同步 對於某個分割槽來說,儲存正分割槽的"broker"為該分割槽的"leader",儲存備份分割槽的"broker"為該分割槽的"follower"。備份分割槽會完全複製正分割槽的訊息,包括訊息的編號等附加屬性值。為了保持正分割槽和備份分割槽的內容一致,Kafka採取的方案是在儲存備份分割槽的"broker"上開啟一個消費者程序進行消費,從而使得正分割槽的內容與備份分割槽的內容保持一致。一般情況下,一個分割槽有一個“正分割槽”和零到多個“備份分割槽”。可以配置“正分割槽+備份分割槽”的總數量,關於這個配置,不同主題可以有不同的配置值。注意,生產者,消費者只與儲存正分割槽的"leader"進行通訊。
Kafka允許topic的分割槽擁有若干副本,這個數量是可以配置的,你可以為每個topic配置副本的數量。Kafka會自動在每個副本上備份資料,所以當一個節點down掉時資料依然是可用的。 Kafka的副本功能不是必須的,你可以配置只有一個副本,這樣其實就相當於只有一份資料。 建立副本的單位是topic的分割槽,每個分割槽都有一個leader和零或多個followers.所有的讀寫操作都由leader處理,一般分割槽的數量都比broker的數量多的多,各分割槽的leader均勻的分佈在brokers中。所有的followers都複製leader的日誌,日誌中的訊息和順序都和leader中的一致。followers向普通的consumer那樣從leader那裡拉取訊息並儲存在自己的日誌檔案中。 許多分散式的訊息系統自動的處理失敗的請求,它們對一個節點是否著(alive)”有著清晰的定義。Kafka判斷一個節點是否活著有兩個條件: 1. 節點必須可以維護和ZooKeeper的連線,Zookeeper通過心跳機制檢查每個節點的連線。 2. 如果節點是個follower,他必須能及時的同步leader的寫操作,延時不能太久。 符合以上條件的節點準確的說應該是“同步中的(in sync)”,而不是模糊的說是“活著的”或是“失敗的”。Leader會追蹤所有“同步中”的節點,一旦一個down掉了,或是卡住了,或是延時太久,leader就會把它移除。至於延時多久算是“太久”,是由引數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由引數replica.lag.time.max.ms決定的。 只有當訊息被所有的副本加入到日誌中時,才算是“committed”,只有committed的訊息才會傳送給consumer,這樣就不用擔心一旦leader down掉了訊息會丟失。Producer也可以選擇是否等待訊息被提交的通知,這個是由引數acks決定的。 Kafka保證只要有一個“同步中”的節點,“committed”的訊息就不會丟失。
七、實踐 搭建環境如下圖,kafka叢集5臺broker,使用zookeeper管理叢集,註冊producer和consumer。
zookeeper配置: # the directory where the snapshot is stored. dataDir=/home/fund/zk/data dataLogDir=/home/fund/zk/log # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 server.1=172.16.30.*:2888:3888 server.2=172.16.30.*:2888:3888 server.3=172.16.30.*:2888:3888 server.4=172.16.30.*:2888:3888 server.5=172.16.30.*:2888:3888 initLimit=5 syncLimit=2
broker配置: ############################# Server Basics #############################
broker.id=213 delete.topic.enable=true ############################# Socket Server Settings ############################# listeners=PLAINTEXT://:9092
advertised.host.name=172.16.30.*
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400 #min.insync.replicas=3 socket.request.max.bytes=104857600 offsets.topic.replication.factor=3 auto.leader.rebalance.enable=true auto.create.topics.enable=true offsets.commit.required.acks=-1 unclean.leader.election=false
############################# Log Basics #############################
log.dirs=/tmp/kafka-logs-1
num.partitions=10
num.recovery.threads.per.data.dir=3
############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000
############################# Log Retention Policy #############################
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=172.16.30.*:2181,172.16.30.*:2181,172.16.30.*:2181,172.16.30.*:2181,172.16.30.*:2181 host.name=172.16.30.* zookeeper.connection.timeout.ms=6000
producer配置: acks=1 retries=3 batch.size=16384 #Normally this occurs only under load when records arrive faster than they can be sent out. linger.ms=1 timeout.ms=30000 buffer.memory=33554432 retry.backoff.ms=100 connections.max.idle.ms=540000 bootstrap.servers=172.16.30.*:9092 partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner value.serializer=com.common.kafka.message.KafkaMessageSerializer key.serializer=org.apache.kafka.common.serialization.LongSerializer
consumer配置: bootstrap.servers=172.16.30.*:9092 enable.auto.commit=flase session.timeout.ms=30000 fetch.min.bytes=1 exclude.internal.topics=true auto.offset.reset=latest key.deserializer=org.apache.kafka.common.serialization.LongDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer group.id=test_50 heartbeat.interval.ms=3000 connections.max.idle.ms=540000 max.poll.records=2147483647 request.timeout.ms=40000 reconnect.backoff.ms=50 retry.backoff.ms=100
測試用例: kafka tests
八、版本比較 (一)、0.8.x vs 0.9 1.安全 Kafka提供了三個安全特性。 一是提供Kerberos 和 TLS 身份認證。 二是提供了類似Unix-like許可權系統,控制哪些使用者可以訪問資料。 三是提供資料傳輸加密。 當然只有新的producer,consumer API和0.9的consumer實現才能使用這些安全特性。老的API還是沒有這些安全方面的控制。 這些安全特性實現了向下相容的方式,不啟動安全特性的使用者不必擔心效能的降低。 這只是第一版的安全特性,更多的安全控制會在將來的版本中提供。 2.Kafka Connect Kafka Connect,它可以和外部系統、資料集建立一個數據流的連線,實現資料的輸入、輸出。
3.新的Consumer Kafka 0.8.2, Producer被重新設計, Kafka 0.9則重新設計了Consumer介面。它不再區分high-level consumer API和low-level consumer API,而是提供了一個統一的consumer API。 1).Kafka可以自行維護Offset、消費者的Position。也可以開發者自己來維護Offset,實現相關的業務需求。消費時,可以只消費指定的Partitions 2).可以使用外部儲存記錄Offset,如資料庫之類的。 3).自行控制Consumer消費訊息的位置。 4).可以使用多執行緒進行消費。
4.為使用者定義配額 一個大的Kafka叢集可能有多個使用者。0.9以前,consumer 如果處理的訊息非常快,可能會壟斷整個broker的網路資源,producer也是如此。現在Kafka 0.9提供了基於client的使用者配額控制。對於Producer可以控制每個client的每秒寫的位元組數,對於Consumer控制每個 client的每秒讀的位元組。
(二)、0.9 vs 0.10 1.Kafka Streams Kafka Streams包含了一整套描述常見流操作的高階語言API(比如 joining, filtering以及aggregating records),這使得開發者可以快速開發強大的流處理應用程式。Kafka Streams提供了狀態和無狀態的處理能力,並且可以部署在很多系統之上。
2.Connectors連線狀態/控制的REST API 在Kafka 0.10.0.0中,Kafka Connect得到了持續提升。 在此之前,使用者需要監控日誌以便看到各個connectors以及他們task的狀態,現在Kafka已經支援了獲取的狀態API這樣使得監控變得更簡單。 同時也添加了控制相關的API,這使得使用者可以在進行維護的時候停止一個connector;或者手動地重啟那些失敗的task。這些能夠直觀的在使用者介面展示和管理connector目前可以在控制中心(Control Center)看到。
3.SASL改進 新的安全特性,包括通過SASL支援Kerberos。Apache Kafka 0.10.0.0現在支援更多的SASL特性,包括外部授權伺服器,在一臺伺服器上支援多種型別的SASL認證以及其他的改進。
4.Rack Awareness 現在Kafka已經內建了機架感知以便隔離副本,這使得Kafka保證副本可以跨越到多個機架或者是可用區域,顯著提高了Kafka的彈性和可用性。這個功能是由Netflix提供的。
5.Kafka Consumer Max Records 在Kafka 0.9.0.0,開發者們在新consumer上使用poll()函式的時候是幾乎無法控制返回訊息的條數。不過值得高興的是,此版本的Kafka引入了max.poll.records引數,允許開發者控制返回訊息的條數。
6.協議版本改進 Kafka brokers現在支援返回所有支援的協議版本的請求API,這個特點的好處就是以後將允許一個客戶端支援多個broker版本。

參考: http://kafka.apache.org/
http://blog.jobbole.com/75328/
http://www.infoq.com/cn/articles/kafka-analysis-part-1
http://blog.csdn.net/yangchao228/article/details/40583765