kafaka/rocketmq
kafaka
官方:釋出訂閱,流處理管道和儲存
https://kafka.apache.org/docu...-
元件
broker
topic(一個queue)
partition(物理分佈,一個topic包含一個或多個partition,可以分佈在不同的broker上)
producer(與broker leader直連,負載均衡指定partition,可批次發,可設定要ack的副本數)
consumer/consumer group
partition:
index全部對映到記憶體,每個partition下自增id
元資料放在zk上.分partition,每個partition副本分散在broker上,單partition+單消費才能順(rocketmq一樣)。每個partition一個索引,順序寫一個檔案。流處理+批量處理,實時上有取捨。
- 高可用和可擴充套件
1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每個partition leader建立socket連線併發送訊息.
2) Broker端使用zookeeper用來註冊broker資訊,已經監測partition leader存活性.所有的Kafka Broker節點一起去Zookeeper上註冊一個臨時節點,成功的為Broker controller,失效後zk後發現重新註冊節點,controller負責各broker內partition的選主(ISR中,記錄replica進度,隨便選)ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條訊息必須被這個集合中的每個節點讀取並追加到日誌中了,才回通知外部這個訊息已經被提交了。因此這個集合中的任何一個節點隨時都可以被選為leader.如果ISR的大小超過某個最小值,則分割槽將僅接受寫入,以防止丟失僅寫入單個副本的訊息(只關注ISR,而不是共識多個都寫入,多數(兩個故障需要5個副本,一個要三個)對於主資料的寫代價大)
3) Consumer端使用zookeeper用來註冊consumer資訊,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader建立socket連線,並獲取訊息。
broker,partition,customer組內執行緒可擴充套件。
- 消費
只保證一個partition被一個customer消費有序
producter推,customer拉(拉需要存日誌)
partition中的每個message只能被組(Consumer group )中的一個consumer(consumer 執行緒)消費,若多個同時要配多個Consumer group。
kafka中的訊息是批量(通常以訊息的條數或者chunk的尺寸為單位)傳送給consumer,當訊息被consumer接收之後,負責維護訊息的消費記錄(JMS等都是broker維護),consumer可以在本地儲存最後訊息的offset,並間歇性的向zookeeper註冊offset.也沒有ACK
訊息消費的可靠性,消費者控制,最多一次,先儲存offset再處理;至少一次,先處理再儲存offset;只一次:最少1次+消費者的輸出中額外增加已處理訊息最大編號 - 日誌壓縮
確保有每個分割槽資料日誌中每個key有最後已知值,offset不能變。對同一partition的多個檔案一起壓縮合並。
position是檔案的bytes偏移吧?壓縮過程中要重建索引和位置?
active不動(不影響寫入),對cleaner point後面的做壓縮,選擇日誌tail和header比例小的,合併壓縮每組log不超過1G,index不超過10M。
對於tail的壓縮過程:【position不變???】
每個日誌清理執行緒會使用一個名為“SkimpyOffsetMap”的物件來構建key與offset的對映關係的雜湊表。日誌清理需要遍歷兩次日誌檔案,第一次遍歷把每個key的雜湊值和最後出現的offset都儲存在SkimpyOffsetMap中,對映模型如下圖所示。第二次遍歷檢查每個訊息是否符合保留條件,如果符合就保留下來,否則就會被清理掉
rocketmq
activemq 不能分片。kafka延時(上面知道基本上partition和consumer需要配置一樣的,一個consumer group的執行緒數和partition數量一致,受partition限制,rocketmq多partition的擴充套件在於都用一個commitlog,而不是一個partition單獨一份順序log,cq只儲存位置,對commitlog中找資料。http://rocketmq.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/)
- 元件
broker :主從
nameserver:幾乎無狀態,可叢集內部署,節點對等,不同步。資料是broker同步過來的
tag
topic
queue
producer:連線ns,主brokers(心跳),無狀態
consumer/group :連線ns,主從brokers(心跳啊) -
高可用和可擴充套件
負載均衡:Broker上存Topic資訊,Topic由多個佇列組成,佇列會平均分散在多個Broker上,而Producer的傳送機制保證訊息儘量平均分佈到所有佇列中,最終效果就是所有訊息都平均落在每個Broker上。
主從:機器級別,不依賴zk,元資料:在 Broker 啟動的時候,其會將自己在本地儲存的配置檔案 (預設位於$HOME/store/config/topics.json 目錄) 中的所有話題載入到記憶體中去,然後會將這些所有的話題全部同步到所有的 Name 伺服器中。與此同時,Broker 也會啟動一個定時任務,預設每隔 30 秒來執行一次話題全同步.
訊息儲存持久化:所有broker上的所有topic都順序寫入記憶體檔案mapedfile(1G),mapedfilelist記錄每個mapedfile在磁碟的偏移量,新訊息寫入最後一個檔案。
動態伸縮能力(非順序訊息,訊息分散;有序訊息只能放在一個queue中,切不支援遷移,只保證一個queue內順序,但可以多消費執行緒保證順序):Broker的伸縮性體現在兩個維度:Topic, Broker。
1)Topic維度:假如一個Topic的訊息量特別大,但叢集水位壓力還是很低,就可以擴大該Topic的佇列數,Topic的佇列數跟傳送、消費速度成正比。
2)Broker維度:如果叢集水位很高了,需要擴容,直接加機器部署Broker就可以。Broker起來後想Namesrv註冊,Producer、Consumer通過Namesrv發現新Broker,立即跟該Broker直連,收發訊息。
Broker與Namesrv的心跳機制:
單個Broker跟所有Namesrv保持心跳請求,心跳間隔為30秒,心跳請求中包括當前Broker所有的Topic資訊。Namesrv會反查Broer的心跳資訊,如果某個Broker在2分鐘之內都沒有心跳,則認為該Broker下線,調整Topic跟Broker的對應關係。但此時Namesrv不會主動通知Producer、Consumer有Broker宕機。
-
消費
1.消費者註冊,消費者上有多有topic的broker地址和佇列,消費者負載均衡選擇;
1)廣播模式:每個costumer全量消費,消費偏移量儲存在costumer中
2)叢集模式:constumer均勻消費部分,每個訊息只有一個costumer消費,儲存在broker上
2.新訊息傳送到q:brocker上commit log和消費組資訊
每個commmit log訊息發給topic的隨機queue中(生產者的負載均衡,每個msg只發送到一個q中),每個queue有很多consumequeue,發給所有。廣播模式,cq會在所有q上,叢集模式cq會負載均衡到某個q上,訊息根據這些配置資料落到q的所有cq上。
3.消費
3.1)普通的併發消費:queue的所有cq都直接發,所有cq傳送後刪除(q以TreeMap結構儲存)。內部RocketMQ 的訊息樹是用 TreeMap 實現的,其內部基於訊息偏移量維護了訊息的有序性。每次消費請求都會從訊息數中拿取偏移量最小的幾條訊息 (預設為 1 條)給使用者,以此來達到有序消費的目的。
3.2)有序消費:在3.1的基礎上加兩個鎖,costumer client給消費的每個queue會加鎖,保證同一時刻只有一個costumer client在消費queue(否則發給一個client刪除了訊息,此訊息在另一個client和後面的client的訊息無法保證順序),預設20s加一次,queue檢測60s沒有就釋放,每次成功後才取下一條,反正只有一個客戶端消費。第二把鎖是在client中,將堆積的訊息按照順序加鎖的寫入執行緒池task佇列中。
-
高可用&高可靠
高可用:叢集部署時一般都為主備,備機實時從主機同步訊息,如果其中一個主機宕機,備機提供消費服務,但不提供寫服務。
高可靠:所有發往broker的訊息,有同步刷盤和非同步刷盤機制;同步刷盤時,訊息寫入物理檔案才會返回成功,非同步刷盤時,只有機器宕機,才會產生訊息丟失,broker掛掉可能會發生,但是機器宕機崩潰是很少發生的,除非突然斷電
其他
- bridgequeue
記憶體。redis實現。適合小型系統 - mmqd 對大型延時系統的支援,引入chronos
這裡的kafka去掉了。普通的直接用哪個rocketmq.延時訊息和事務訊息
對延時訊息,放入rocketmq一個內部的消費topic中,消費入chronos中(存RocksDB,seektimestamp, while從leveldb中取符合時間的再放入rocketmq中)
事務訊息:A執行後要傳送訊息給B,因為ddmq一旦接收是保證被消費的,所以增加發送方事務回查。
- 對比
分析:少topic時kafka效能好,rockemq需要讀mq後去讀一個大的cl。多topic是rockemq好,處理執行緒多。