1. 程式人生 > >Kafka入門,簡介,使用場景(轉載)

Kafka入門,簡介,使用場景(轉載)

一、入門

    1、簡介

    Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規範的實現。kafka對訊息儲存時根據Topic進行歸類,傳送訊息者成為Producer,訊息接受者成為Consumer,此外kafka叢集有多個kafka例項組成,每個例項(server)成為broker。無論是kafka叢集,還是producer和consumer都依賴於zookeeper來保證系統可用性叢集儲存一些meta資訊。

2、Topics/logs

    一個Topic可以認為是一類訊息,每個topic將被分成多個partition(區),每個partition在儲存層面是append log檔案。任何釋出到此partition的訊息都會被直接追加到log檔案的尾部,每條訊息在檔案中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條訊息。它唯一的標記一條訊息。kafka並沒有提供其他額外的索引機制來儲存offset,因為在kafka中幾乎不允許對訊息進行“隨機讀寫”。

kafka和JMS(Java Message Service)實現(activeMQ)不同的是:即使訊息被消費,訊息仍然不會被立即刪除.日誌檔案將會根據broker中的配置要求,保留一定的時間之後刪除;比如log檔案保留2天,那麼兩天後,檔案會被清除,無論其中的訊息是否被消費.kafka通過這種簡單的手段,來釋放磁碟空間,以及減少訊息消費之後對檔案內容改動的磁碟IO開支.

    對於consumer而言,它需要儲存消費訊息的offset,對於offset的儲存和使用,有consumer來控制;當consumer正常消費訊息時,offset將會"線性"的向前驅動,即訊息將依次順序被消費.事實上consumer可以使用任意順序消費訊息,它只需要將offset重置為任意值..(offset將會儲存在zookeeper中,參見下文)

    kafka叢集幾乎不需要維護任何consumer和producer狀態資訊,這些資訊有zookeeper儲存;因此producer和consumer的客戶端實現非常輕量級,它們可以隨意離開,而不會對叢集造成額外的影響.

    partitions的設計目的有多個.最根本原因是kafka基於檔案儲存.通過分割槽,可以將日誌內容分散到多個server上,來避免檔案尺寸達到單機磁碟的上限,每個partiton都會被當前server(kafka例項)儲存;可以將一個topic切分多任意多個partitions,來訊息儲存/消費的效率.此外越多的partitions意味著可以容納更多的consumer,有效提升併發消費的能力.(具體原理參見下文).

3、Distribution

    一個Topic的多個partitions,被分佈在kafka叢集中的多個server上;每個server(kafka例項)負責partitions中訊息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition將會被備份到多臺機器上,以提高可用性.

    基於replicated方案,那麼就意味著需要對多個備份進行排程;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那麼將會有其他follower來接管(成為新的leader);follower只是單調的和leader跟進,同步訊息即可..由此可見作為leader的server承載了全部的請求壓力,因此從叢集的整體考慮,有多少個partitions就意味著有多少個"leader",kafka會將"leader"均衡的分散在每個例項上,來確保整體的效能穩定.

    Producers

    Producer將訊息釋出到指定的Topic中,同時Producer也能決定將此訊息歸屬於哪個partition;比如基於"round-robin"方式或者通過其他的一些演算法等.

    Consumers

    本質上kafka只支援Topic.每個consumer屬於一個consumer group;反過來說,每個group中可以有多個consumer.傳送到Topic的訊息,只會被訂閱此Topic的每個group中的一個consumer消費.

    如果所有的consumer都具有相同的group,這種情況和queue模式很像;訊息將會在consumers之間負載均衡.

    如果所有的consumer都具有不同的group,那這就是"釋出-訂閱";訊息將會廣播給所有的消費者.

    在kafka中,一個partition中的訊息只會被group中的一個consumer消費;每個group中consumer訊息消費互相獨立;我們可以認為一個group是一個"訂閱"者,一個Topic中的每個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個consumer可以消費多個partitions中的訊息.kafka只能保證一個partition中的訊息被某個consumer消費時,訊息是順序的.事實上,從Topic角度來說,訊息仍不是有序的.

    kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,否則將意味著某些consumer將無法得到訊息.

    Guarantees

    1) 傳送到partitions中的訊息將會按照它接收的順序追加到日誌中

    2) 對於消費者而言,它們消費訊息的順序和日誌中訊息順序一致.

    3) 如果Topic的"replicationfactor"為N,那麼允許N-1個kafka例項失效.

二、使用場景

    1、Messaging   

    對於一些常規的訊息系統,kafka是個不錯的選擇;partitons/replication和容錯,可以使kafka具有良好的擴充套件性和效能優勢.不過到目前為止,我們應該很清楚認識到,kafka並沒有提供JMS中的"事務性""訊息傳輸擔保(訊息確認機制)""訊息分組"等企業級特性;kafka只能使用作為"常規"的訊息系統,在一定程度上,尚未確保訊息的傳送與接收絕對可靠(比如,訊息重發,訊息傳送丟失等)

    2、Websit activity tracking

    kafka可以作為"網站活性跟蹤"的最佳工具;可以將網頁/使用者操作等資訊傳送到kafka中.並實時監控,或者離線統計分析等

    3、Log Aggregation

    kafka的特性決定它非常適合作為"日誌收集中心";application可以將操作日誌"批量""非同步"的傳送到kafka叢集中,而不是儲存在本地或者DB中;kafka可以批量提交訊息/壓縮訊息等,這對producer端而言,幾乎感覺不到效能的開支.此時consumer端可以使hadoop等其他系統化的儲存和分析系統.

三、設計原理

    kafka的設計初衷是希望作為一個統一的資訊收集平臺,能夠實時的收集反饋資訊,並需要能夠支撐較大的資料量,且具備良好的容錯能力.

1、永續性

    kafka使用檔案儲存訊息,這就直接決定kafka在效能上嚴重依賴檔案系統的本身特性.且無論任何OS下,對檔案系統本身的優化幾乎沒有可能.檔案快取/直接記憶體對映等是常用的手段.因為kafka是對日誌檔案進行append操作,因此磁碟檢索的開支是較小的;同時為了減少磁碟寫入的次數,broker會將訊息暫時buffer起來,當訊息的個數(或尺寸)達到一定閥值時,再flush到磁碟,這樣減少了磁碟IO呼叫的次數.


2、效能

    需要考慮的影響效能點很多,除磁碟IO之外,我們還需要考慮網路IO,這直接關係到kafka的吞吐量問題.kafka並沒有提供太多高超的技巧;對於producer端,可以將訊息buffer起來,當訊息的條數達到一定閥值時,批量傳送給broker;對於consumer端也是一樣,批量fetch多條訊息.不過訊息量的大小可以通過配置檔案來指定.對於kafka broker端,似乎有個sendfile系統呼叫可以潛在的提升網路IO的效能:將檔案的資料對映到系統記憶體中,socket直接讀取相應的記憶體區域即可,而無需程序再次copy和交換. 其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,因此啟用訊息壓縮機制是一個良好的策略;壓縮需要消耗少量的CPU資源,不過對於kafka而言,網路IO更應該需要考慮.可以將任何在網路上傳輸的訊息都經過壓縮.kafka支援gzip/snappy等多種壓縮方式.

    3、生產者

    負載均衡: producer將會和Topic下所有partition leader保持socket連線;訊息由producer直接通過socket傳送到broker,中間不會經過任何"路由層".事實上,訊息被路由到哪個partition上,有producer客戶端決定.比如可以採用"random""key-hash""輪詢"等,如果一個topic中有多個partitions,那麼在producer端實現"訊息均衡分發"是必要的.

    其中partition leader的位置(host:port)註冊在zookeeper中,producer作為zookeeper client,已經註冊了watch用來監聽partition leader的變更事件.

    非同步傳送:將多條訊息暫且在客戶端buffer起來,並將他們批量的傳送到broker,小資料IO太多,會拖慢整體的網路延遲,批量延遲傳送事實上提升了網路效率。不過這也有一定的隱患,比如說當producer失效時,那些尚未傳送的訊息將會丟失。

    4、消費者

    consumer端向broker傳送"fetch"請求,並告知其獲取訊息的offset;此後consumer將會獲得一定條數的訊息;consumer端也可以重置offset來重新消費訊息.

    在JMS實現中,Topic模型基於push方式,即broker將訊息推送給consumer端.不過在kafka中,採用了pull方式,即consumer在和broker建立連線之後,主動去pull(或者說fetch)訊息;這中模式有些優點,首先consumer端可以根據自己的消費能力適時的去fetch訊息並處理,且可以控制訊息消費的進度(offset);此外,消費者可以良好的控制訊息消費的數量,batch fetch.

    其他JMS實現,訊息消費的位置是有prodiver保留,以便避免重複傳送訊息或者將沒有消費成功的訊息重發等,同時還要控制訊息的狀態.這就要求JMS broker需要太多額外的工作.在kafka中,partition中的訊息只有一個consumer在消費,且不存在訊息狀態的控制,也沒有複雜的訊息確認機制,可見kafka broker端是相當輕量級的.當訊息被consumer接收之後,consumer可以在本地儲存最後訊息的offset,並間歇性的向zookeeper註冊offset.由此可見,consumer客戶端也很輕量級.

5、訊息傳送機制

    對於JMS實現,訊息傳輸擔保非常直接:有且只有一次(exactly once).在kafka中稍有不同:

    1) at most once: 最多一次,這個和JMS中"非持久化"訊息類似.傳送一次,無論成敗,將不會重發.

    2) at least once: 訊息至少傳送一次,如果訊息未能接受成功,可能會重發,直到接收成功.

    3) exactly once: 訊息只會傳送一次.

    at most once: 消費者fetch訊息,然後儲存offset,然後處理訊息;當client儲存offset之後,但是在訊息處理過程中出現了異常,導致部分訊息未能繼續處理.那麼此後"未處理"的訊息將不能被fetch到,這就是"at most once".

    at least once: 消費者fetch訊息,然後處理訊息,然後儲存offset.如果訊息處理成功之後,但是在儲存offset階段zookeeper異常導致儲存操作未能執行成功,這就導致接下來再次fetch時可能獲得上次已經處理過的訊息,這就是"at least once",原因offset沒有及時的提交給zookeeper,zookeeper恢復正常還是之前offset狀態.

    exactly once: kafka中並沒有嚴格的去實現(基於2階段提交,事務),我們認為這種策略在kafka中是沒有必要的.

    通常情況下"at-least-once"是我們搜選.(相比at most once而言,重複接收資料總比丟失資料要好).

    6、複製備份

    kafka將每個partition資料複製到多個server上,任何一個partition有一個leader和多個follower(可以沒有);備份的個數可以通過broker配置檔案來設定.leader處理所有的read-write請求,follower需要和leader保持同步.Follower和consumer一樣,消費訊息並儲存在本地日誌中;leader負責跟蹤所有的follower狀態,如果follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.當所有的follower都將一條訊息儲存成功,此訊息才被認為是"committed",那麼此時consumer才能消費它.即使只有一個replicas例項存活,仍然可以保證訊息的正常傳送和接收,只要zookeeper叢集存活即可.(不同於其他分散式儲存,比如hbase需要"多數派"存活才行)

    當leader失效時,需在followers中選取出新的leader,可能此時follower落後於leader,因此需要選擇一個"up-to-date"的follower.選擇follower時需要兼顧一個問題,就是新leaderserver上所已經承載的partition leader的個數,如果一個server上有過多的partition leader,意味著此server將承受著更多的IO壓力.在選舉新leader,需要考慮到"負載均衡".

    7.日誌

    如果一個topic的名稱為"my_topic",它有2個partitions,那麼日誌將會儲存在my_topic_0和my_topic_1兩個目錄中;日誌檔案中儲存了一序列"log entries"(日誌條目),每個log entry格式為"4個位元組的數字N表示訊息的長度" + "N個位元組的訊息內容";每個日誌都有一個offset來唯一的標記一條訊息,offset的值為8個位元組的數字,表示此訊息在此partition中所處的起始位置..每個partition在物理儲存層面,有多個log file組成(稱為segment).segmentfile的命名為"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始訊息的offset.

    其中每個partiton中所持有的segments列表資訊會儲存在zookeeper中.

  當segment檔案尺寸達到一定閥值時(可以通過配置檔案設定,預設1G),將會建立一個新的檔案;當buffer中訊息的條數達到閥值時將會觸發日誌資訊flush到日誌檔案中,同時如果"距離最近一次flush的時間差"達到閥值時,也會觸發flush到日誌檔案.如果broker失效,極有可能會丟失那些尚未flush到檔案的訊息.因為server意外實現,仍然會導致log檔案格式的破壞(檔案尾部),那麼就要求當server啟東是需要檢測最後一個segment的檔案結構是否合法並進行必要的修復.

    獲取訊息時,需要指定offset和最大chunk尺寸,offset用來表示訊息的起始位置,chunk size用來表示最大獲取訊息的總長度(間接的表示訊息的條數).根據offset,可以找到此訊息所在segment檔案,然後根據segment的最小offset取差值,得到它在file中的相對位置,直接讀取輸出即可.

    日誌檔案的刪除策略非常簡單:啟動一個後臺執行緒定期掃描log file列表,把儲存時間超過閥值的檔案直接刪除(根據檔案的建立時間).為了避免刪除檔案時仍然有read操作(consumer消費),採取copy-on-write方式.

    8、分配

    kafka使用zookeeper來儲存一些meta資訊,並使用了zookeeper watch機制來發現meta資訊的變更並作出相應的動作(比如consumer失效,觸發負載均衡等)

    1) Broker node registry: 當一個kafkabroker啟動後,首先會向zookeeper註冊自己的節點資訊(臨時znode),同時當broker和zookeeper斷開連線時,此znode也會被刪除.

    格式: /broker/ids/[0...N]   -->host:port;其中[0..N]表示broker id,每個broker的配置檔案中都需要指定一個數字型別的id(全域性不可重複),znode的值為此broker的host:port資訊.

    2) Broker Topic Registry: 當一個broker啟動時,會向zookeeper註冊自己持有的topic和partitions資訊,仍然是一個臨時znode.

    格式: /broker/topics/[topic]/[0...N]  其中[0..N]表示partition索引號.

    3) Consumer and Consumer group: 每個consumer客戶端被建立時,會向zookeeper註冊自己的資訊;此作用主要是為了"負載均衡".

    一個group中的多個consumer可以交錯的消費一個topic的所有partitions;簡而言之,保證此topic的所有partitions都能被此group所消費,且消費時為了效能考慮,讓partition相對均衡的分散到每個consumer上.

    4) Consumer id Registry: 每個consumer都有一個唯一的ID(host:uuid,可以通過配置檔案指定,也可以由系統生成),此id用來標記消費者資訊.

    格式:/consumers/[group_id]/ids/[consumer_id]

    仍然是一個臨時的znode,此節點的值為{"topic_name":#streams...},即表示此consumer目前所消費的topic + partitions列表.

    5) Consumer offset Tracking: 用來跟蹤每個consumer目前所消費的partition中最大的offset.

    格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value

    此znode為持久節點,可以看出offset跟group_id有關,以表明當group中一個消費者失效,其他consumer可以繼續消費.

    6) Partition Owner registry: 用來標記partition被哪個consumer消費.臨時znode

    格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id當consumer啟動時,所觸發的操作:

    A) 首先進行"Consumer id Registry";

    B) 然後在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其他consumer的"leave"和"join";只要此znode path下節點列表變更,都會觸發此group下consumer的負載均衡.(比如一個consumer失效,那麼其他consumer接管partitions).

    C) 在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活情況;如果broker列表變更,將會觸發所有的groups下的consumer重新balance.

    1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連線併發送訊息.

    2) Broker端使用zookeeper用來註冊broker資訊,已經監測partitionleader存活性.

    3) Consumer端使用zookeeper用來註冊consumer資訊,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連線,並獲取訊息.

四、主要配置

    1、Broker配置

    2.Consumer主要配置

3.Producer主要配置

以上是關於kafka一些基礎說明,在其中我們知道如果要kafka正常執行,必須配置zookeeper,否則無論是kafka叢集還是客戶端的生存者和消費者都無法正常的工作的,以下是對zookeeper進行一些簡單的介紹:

五、zookeeper叢集

    zookeeper是一個為分散式應用提供一致性服務的軟體,它是開源的Hadoop專案的一個子專案,並根據google發表的一篇論文來實現的。zookeeper為分散式系統提供了高笑且易於使用的協同服務,它可以為分散式應用提供相當多的服務,諸如統一命名服務,配置管理,狀態同步和組服務等。zookeeper介面簡單,我們不必過多地糾結在分散式系統程式設計難於處理的同步和一致性問題上,你可以使用zookeeper提供的現成(off-the-shelf)服務來實現來實現分散式系統額配置管理,組管理,Leader選舉等功能。

    zookeeper叢集的安裝,準備三臺伺服器server1:192.168.0.1,server2:192.168.0.2,

    server3:192.168.0.3.

    1)下載zookeeper

    到http://zookeeper.apache.org/releases.html去下載最新版本Zookeeper-3.4.5的安裝包zookeeper-3.4.5.tar.gz.將檔案儲存server1的~目錄下

    2)安裝zookeeper

    先在伺服器server分別執行a-c步驟

    a)解壓  

    tar -zxvf zookeeper-3.4.5.tar.gz

    解壓完成後在目錄~下會發現多出一個目錄zookeeper-3.4.5,重新命令為zookeeper

    b)配置

    將conf/zoo_sample.cfg拷貝一份命名為zoo.cfg,也放在conf目錄下。然後按照如下值修改其中的配置:

    # The number of milliseconds of each tick

    tickTime=2000

    # The number of ticks that the initial

    # synchronization phase can take

    initLimit=10

    # The number of ticks that can pass between

    # sending a request and getting an acknowledgement

    syncLimit=5

    # the directory where the snapshot is stored.

    # do not use /tmp for storage, /tmp here is just

    # example sakes.

    dataDir=/home/wwb/zookeeper /data

    dataLogDir=/home/wwb/zookeeper/logs

    # the port at which the clients will connect

    clientPort=2181

    #

    # Be sure to read the maintenance section of the

    # administrator guide before turning on autopurge.

    #

    # The number of snapshots to retain in dataDir

    #autopurge.snapRetainCount=3

    # Purge task interval in hours

    # Set to "0" to disable auto purge feature

    #autopurge.purgeInterval=1

    server.1=192.168.0.1:3888:4888

    server.2=192.168.0.2:3888:4888

    server.3=192.168.0.3:3888:4888

    tickTime:這個時間是作為 Zookeeper 伺服器之間或客戶端與伺服器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。

    dataDir:顧名思義就是 Zookeeper 儲存資料的目錄,預設情況下,Zookeeper 將寫資料的日誌檔案也儲存在這個目錄裡。

    clientPort:這個埠就是客戶端連線 Zookeeper 伺服器的埠,Zookeeper 會監聽這個埠,接受客戶端的訪問請求。

    initLimit:這個配置項是用來配置 Zookeeper 接受客戶端(這裡所說的客戶端不是使用者連線 Zookeeper 伺服器的客戶端,而是 Zookeeper 伺服器叢集中連線到 Leader 的 Follower 伺服器)初始化連線時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度後 Zookeeper 伺服器還沒有收到客戶端的返回資訊,那麼表明這個客戶端連線失敗。總的時間長度就是 5*2000=10 秒

    syncLimit:這個配置項標識 Leader 與Follower 之間傳送訊息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是2*2000=4 秒

    server.A=B:C:D:其中 A 是一個數字,表示這個是第幾號伺服器;B 是這個伺服器的 ip 地址;C 表示的是這個伺服器與叢集中的 Leader 伺服器交換資訊的埠;D 表示的是萬一叢集中的 Leader 伺服器掛了,需要一個埠來重新進行選舉,選出一個新的 Leader,而這個埠就是用來執行選舉時伺服器相互通訊的埠。如果是偽叢集的配置方式,由於 B 都是一樣,所以不同的 Zookeeper 例項通訊埠號不能一樣,所以要給它們分配不同的埠號

注意:dataDir,dataLogDir中的wwb是當前登入使用者名稱,data,logs目錄開始是不存在,需要使用mkdir命令建立相應的目錄。並且在該目錄下建立檔案myid,serve1,server2,server3該檔案內容分別為1,2,3。

針對伺服器server2,server3可以將server1複製到相應的目錄,不過需要注意dataDir,dataLogDir目錄,並且檔案myid內容分別為2,3

    3)依次啟動server1,server2,server3的zookeeper.

    /home/wwb/zookeeper/bin/zkServer.sh start,出現類似以下內容

    JMX enabled by default

    Using config: /home/wwb/zookeeper/bin/../conf/zoo.cfg

    Starting zookeeper ... STARTED

   4) 測試zookeeper是否正常工作,在server1上執行以下命令

    /home/wwb/zookeeper/bin/zkCli.sh -server192.168.0.2:2181,出現類似以下內容

    JLine support is enabled

    2013-11-27 19:59:40,560 - INFO      [main-SendThread(localhost.localdomain:2181):[email protected]]- Session   establishmentcomplete on server localhost.localdomain/127.0.0.1:2181, sessionid =    0x1429cdb49220000, negotiatedtimeout = 30000

    WATCHER::

    WatchedEvent state:SyncConnected type:None path:null

    [zk: 127.0.0.1:2181(CONNECTED) 0] [[email protected]]#  

    即代表叢集構建成功了,如果出現錯誤那應該是第三部時沒有啟動好叢集,

執行,先利用

    ps aux | grep zookeeper檢視是否有相應的程序的,沒有話,說明叢集啟動出現問題,可以在每個伺服器上使用

    ./home/wwb/zookeeper/bin/zkServer.sh stop。再依次使用./home/wwb/zookeeper/binzkServer.sh start,這時在執行4一般是沒有問題,如果還是有問題,那麼先stop再到bin的上級目錄執行./bin/zkServer.shstart試試。

注意:zookeeper叢集時,zookeeper要求半數以上的機器可用,zookeeper才能提供服務。

六、kafka叢集

(利用上面server1,server2,server3,下面以server1為例項)

    1)下載kafka0.8(http://kafka.apache.org/downloads.html),儲存到伺服器/home/wwb目錄下kafka-0.8.0-beta1-src.tgz(kafka_2.8.0-0.8.0-beta1.tgz)

    2)解壓 tar -zxvf kafka-0.8.0-beta1-src.tgz,產生資料夾kafka-0.8.0-beta1-src更改為kafka01   

3)配置

    修改kafka01/config/server.properties,其中broker.id,log.dirs,zookeeper.connect必須根據實際情況進行修改,其他項根據需要自行斟酌。大致如下:

     broker.id=1  

     port=9091  

     num.network.threads=2  

     num.io.threads=2  

     socket.send.buffer.bytes=1048576  

    socket.receive.buffer.bytes=1048576  

     socket.request.max.bytes=104857600  

    log.dir=./logs  

    num.partitions=2  

    log.flush.interval.messages=10000  

    log.flush.interval.ms=1000  

    log.retention.hours=168  

    #log.retention.bytes=1073741824  

    log.segment.bytes=536870912  

    num.replica.fetchers=2  

    log.cleanup.interval.mins=10  

    zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183  

    zookeeper.connection.timeout.ms=1000000  

    kafka.metrics.polling.interval.secs=5  

    kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter  

    kafka.csv.metrics.dir=/tmp/kafka_metrics  

    kafka.csv.metrics.reporter.enabled=false

4)初始化因為kafka用scala語言編寫,因此執行kafka需要首先準備scala相關環境。

    > cd kafka01  

    > ./sbt update  

    > ./sbt package  

    > ./sbt assembly-package-dependency

在第二個命令時可能需要一定時間,由於要下載更新一些依賴包。所以請大家 耐心點。

5) 啟動kafka01

    >JMX_PORT=9997 bin/kafka-server-start.sh config/server.properties &  

a)kafka02操作步驟與kafka01雷同,不同的地方如下

    修改kafka02/config/server.properties

    broker.id=2

    port=9092

    ##其他配置和kafka-0保持一致

    啟動kafka02

    JMX_PORT=9998 bin/kafka-server-start.shconfig/server.properties &  

b)kafka03操作步驟與kafka01雷同,不同的地方如下

    修改kafka03/config/server.properties

    broker.id=3

    port=9093

    ##其他配置和kafka-0保持一致

    啟動kafka02

    JMX_PORT=9999 bin/kafka-server-start.shconfig/server.properties &

6)建立Topic(包含一個分割槽,三個副本)

    >bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic

7)檢視topic情況

    >bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181

    topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0

8)建立傳送者

   >bin/kafka-console-producer.sh--broker-list 192.168.0.1:9091 --topic my-replicated-topic

    my test message1

    my test message2

    ^C

9)建立消費者

    >bin/kafka-console-consumer.sh --zookeeper127.0.0.1:2181 --from-beginning --topic my-replicated-topic

    ...

    my test message1

    my test message2

^C

10)殺掉server1上的broker

  >pkill -9 -f config/server.properties

11)檢視topic

  >bin/kafka-list-top.sh --zookeeper192.168.0.1:2181

  topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0

發現topic還正常的存在

11)建立消費者,看是否能查詢到訊息

    >bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic

    ...

    my test message 1

    my test message 2

    ^C

說明一切都是正常的。

OK,以上就是對Kafka個人的理解,不對之處請大家及時指出。

補充說明:

1、public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap),其中該方法的引數Map的key為topic名稱,value為topic對應的分割槽數,譬如說如果在kafka中不存在相應的topic時,則會建立一個topic,分割槽數為value,如果存在的話,該處的value則不起什麼作用

2、關於生產者向指定的分割槽傳送資料,通過設定partitioner.class的屬性來指定向那個分割槽傳送資料,如果自己指定必須編寫相應的程式,預設是kafka.producer.DefaultPartitioner,分割槽程式是基於雜湊的鍵。

3、在多個消費者讀取同一個topic的資料,為了保證每個消費者讀取資料的唯一性,必須將這些消費者group_id定義為同一個值,這樣就構建了一個類似佇列的資料結構,如果定義不同,則類似一種廣播結構的。

4、在consumerapi中,引數設計到數字部分,類似Map<String,Integer>,

numStream,指的都是在topic不存在的時,會建立一個topic,並且分割槽個數為Integer,numStream,注意如果數字大於broker的配置中num.partitions屬性,會以num.partitions為依據建立分割槽個數的。

5、producerapi,呼叫send時,如果不存在topic,也會建立topic,在該方法中沒有提供分割槽個數的引數,在這裡分割槽個數是由服務端broker的配置中num.partitions屬性決定的

相關推薦

Kafka入門簡介使用場景轉載

一、入門     1、簡介     Kafka is a distributed,partitioned,replicated commit logservice。它提供了類似於JMS的特性,但是在設計實現上完全不同,此外它並不是JMS規範的實現。kafka對訊息儲存

看到一個牛人的群聊天記錄超贊!轉載

top 不出 說明 話題 悲劇 最好的 至少 防止 自己 下面是聊天記錄原文(http://bbs.csdn.net/topics/390114474來自csdn-bbs): “ 2012/2/2 15:13:23 有一個原則 2012/2/2 15:13:

ZYNQ之ubootkernel裝置樹檔案系統生成轉載

 轉載自:https://www.cnblogs.com/huakaimanlin/p/9055800.html   Vivado:2016.4   Linux:Ubuntu16.4   ZYNQ:xc7z020      下載檔名稱   2016.4-zed-release.tar.x

視覺工程師必須知道的工業相機50問絕對乾貨!轉載

轉自: http://blog.sina.com.cn/s/blog_14ed0c4400102x1bc.html 1:工業相機的丟幀的問題是由什麼原因引起的? 經常會有一些機器視覺工程師認為USB介面的工業相機會造成丟幀現象。一般而言,工業相機丟幀與工業相機所採用的傳輸介面是沒有關係的,無論是US

MVCMVP 和 MVVM轉載

1 什麼是MVC MVC的目的是為了把資料(Model)和檢視(View)分離開來,然後用控制器(Controller)作膠水來粘合M和V之間的關係。 這樣做的目的是為了實現注意點分離這樣一個更高層次的設計理念,也就是讓專業的物件做專業的事情,View就只負責檢視相關的東西,Mode

JS獲取瀏覽器視窗大小 獲取螢幕瀏覽器網頁高度寬度轉載

網頁可見區域寬:document.body.clientWidth 網頁可見區域高:document.body.clientHeight 網頁可見區域寬:document.body.offsetWidth (包括邊線的寬) 網頁可見區域高:document

20不努力30做助理轉載

  大二的表弟給我打電話,說大學生活很無聊,日子不知道該怎麼打發。他是不想泡妞的,因為“時間還沒到”,他內心深處隱隱認為應該做點什麼,方不荒廢青春,卻不知該做些什麼好。我知道這孩子一向乖,也一向有毅力,不然以湖北的高考難度,也不能進入現在的全國重點。我問了他的學習和生活狀況,他說家裡給的生活費足夠

資料庫水平切分的實現原理解析---分庫分表主從叢集負載均衡器 轉載...

第1章  引言 隨著網際網路應用的廣泛普及,海量資料的儲存和訪問成為了系統設計的瓶頸問題。對於一個大型的網際網路應用,每天幾十億的PV無疑對資料庫造成了相當高的負載。對於系統的穩定性和擴充套件性造成了極大的問題。通過資料切分來提高網站效能,橫向擴充套件資料層已經成為架構研發人員首選的方式。水平切分資料庫,可

淺析軟體成本估算之NESMA方法的3種應用場景轉載

NESMA為荷蘭軟體度量協會的簡稱(Netherland Software Measurement Association),NESMA功能點方法是五種ISO國際功能點標準之一,不但易學易用、快速、經濟,而且容易開發和建立使用者自己特有的估算模型。 在五種國際標準中,只有NESMA方法定

藍芽核心技術概述:藍芽使用場景轉載

轉載:xubin341719 網址:http://blog.csdn.net/xubin341719/article/details/38228705 藍芽應用的過程中,不同的場合、功能及相關協議。這篇將做詳細的介紹說明。 1、ADVANCED AUDIO DISTRIBUTION PROFILE高階音訊分

kafka入門簡介、使用場景、設計原理、主要配置及集群搭建

request 上傳 結構 數據 send gist segments ring 希望 問題導讀: 1.zookeeper在kafka的作用是什麽? 2.kafka中幾乎不允許對消息進行“隨機讀寫”的原因是什麽? 3.kafka集群consumer和producer狀態信息

kafka入門簡介、使用場景、設計原理、主要配置及叢集搭建

問題導讀: 1.zookeeper在kafka的作用是什麼? 2.kafka中幾乎不允許對訊息進行“隨機讀寫”的原因是什麼? 3.kafka叢集consumer和producer狀態資訊是如何儲存的? 4.partitions設計的目的的根本原因是什麼? 一、入門     1、簡介  

ETL-Kettle學習筆記入門簡介簡單操作

KETTLE Kettle:簡介 ETL:簡介 ETL(Extract-Transform-Load的縮寫,即資料抽取、轉換、裝載的過程),對於企業或行業應用來說,我們經常會遇到各種資料的處理,轉換,遷移,所以瞭解並掌握一種etl工具的使用,必不可少的,Kettle就是強大的ETL工具。 Kettle:概念

Asp.net MVC使用FormsAuthenticationMVC和WEB API可以共享身份認證 轉載

mlp ges web api nbsp 快速 charset 生成頁面 核心 lds 在實際的項目應用中,很多時候都需要保證數據的安全和可靠,如何來保證數據的安全呢?做法有很多,最常見的就是進行身份驗證。驗證通過,根據驗證過的身份給與對應訪問權限。同在Web Api中如何

畢業後短時間內月薪翻倍的人都經歷了什麽?轉載

... 讓我 經驗 不能 企業 成就感 薪水高 單位 and 畢業季,現在應屆生們坐在一起討(tu)論(cao)的,都是自己的新單位,和職場上那些新鮮又不知所措的第一次。 我們不畫成就感/自我實現的大餅,對於大多數的俗人來說,工作就是為了賺錢,尤其是對於剛畢業的學生,必然是

【SpringMVC架構】SpringMVC入門實例解析工作原理

rip 業務邏輯層 popu 輸入 implement override article hide -i 上篇博文,我們簡單的介紹了什麽是SpringMVC。這篇博文。我們搭建一個簡單SpringMVC的環境,使用非註解形式實現一個HelloWorld實

當一個程序員寫不出代碼了該怎麽辦?轉載

保持 解決 為我 水平 身體 min height 開源 軟件工程 翻譯作者:碼農網-小峰 轉載地址:http://www.codeceo.com/article/what-to-do-programming-sucks.html 原文標題:What Do You Do W

【開源分享:入門到精通ASP.NET MVC+EF6+Bootstrap】從這裏開始一起搭框架1開篇介紹

strong src 擁有 ckeditor 開發 技術分享 mdi 控制 https 框架簡介 這幾年一直在做ASP.NET開發,幾年前做項目都是老老實實一行行的寫代碼,後來發現那些高手基本都會有自己積累起來的代碼庫,現在稱之為開發框架,基礎代碼不用再去堆,

馬雲寫給兒子的一封信感動無數人!轉載

年輕 武器 tex 馬雲 長大 .com 愛的 除了 事業 轉載:http://www.ebrun.com/20160523/176991.shtml 我兒:寫這個備忘錄給你,基於三個原則:   一、人生福禍無常,誰也不知可以活多久,有些事情還是早一點說好。

精心收集的 48 個 JavaScript 代碼片段僅需 30 秒就可理解!轉載

mat fine 添加 pre case nag map tolower != Anagrams of string(帶有重復項) 使用遞歸。對於給定字符串中的每個字母,為字母創建字謎。使用map()將字母與每部分字謎組合,然後使用reduce()將所有字謎組合到一個數組中