1. 程式人生 > >RocketMQ系列(一)基本概念

RocketMQ系列(一)基本概念

RocketMQ是阿里出品的一款開源的訊息中介軟體,讓其聲名大噪的就是它的事務訊息的功能。在企業中,訊息中介軟體選擇使用RocketMQ的還是挺多的,這一系列的文章都是針對RocketMQ的,咱們先從RocketMQ的一些基本概念和環境的搭建開始聊起。 RocketMQ由4部分組成,分別是:名稱服務(Name Server)、訊息佇列(Brokers)、生產者(producer)和消費者(consumer)。這4部分都可以進行水平擴充套件,從而避免單點故障,如下圖, ![](https://img2020.cnblogs.com/blog/1191201/202006/1191201-20200603173058174-1551688390.png) 這是RocketMQ官網上的一張圖,非常清晰的列出了4個部分,並且都是叢集模式。下面我們就分別說一說這4部分。 ## 名稱服務(NameServer) Name Server扮演的角色是一個註冊中心,和Zookeeper的作用差不多。它的主要功能有兩個,如下: * broker的管理:broker叢集將自己的資訊註冊到NameServer,NameServer提供心跳機制檢測每一個broker是否正常。 * 路由管理:每一個NameServer都有整個broker叢集和佇列的資訊,以便客戶端(生產者和消費者)查詢。 NameServer協調著分散式系統中的每一個元件,並且管理著每一個Topic的路由資訊。 ## Broker Broker主要是儲存訊息,並且提供Topic的機制。它提供推和拉兩種模式,還有一些容災的措施,比如可以配置訊息副本。下面我們看一看Brokcer的主從機制。 Broker的角色分為“非同步主”、“同步主”和“從”三個角色。如果你不能容忍訊息的丟失,你可以配置一個“同步主”和“從”兩個Broker,如果你覺得訊息丟失也無所謂,只要佇列可用就ok的話,你可以配置“非同步主”和“從”兩個broker。如果你只是想簡單的搭建,只配置一個“非同步主”,不配置“從”也是可以的。 上面提到的是broker之間的備份,broker裡的資訊也是可以儲存到磁碟的,儲存到磁碟的方式也有兩種,推薦的方式是非同步儲存磁碟,同步儲存磁碟是非常損耗效能的。 ## 生產者 生產者支援叢集部署,它們向broker叢集傳送訊息,而且支援多種負載均衡的方式。 當生產者向broker傳送訊息時,會得到傳送結果,傳送結果中有一個傳送狀態。假設我們的配置中,訊息的配置`isWaitStoreMsgOK = true`,這個配置預設也是`true`,如果你配置為`false`,在傳送訊息的過程中,只要不發生異常,傳送結果都是`SEND_OK`。當`isWaitStoreMsgOK = true`,傳送結果有以下幾種, * `FLUSH_DISK_TIMEOUT`:儲存磁碟超時,當儲存磁碟的方式設定為SYNC_FLUSH(同步),並且在syncFlushTimeout配置的時間內(預設5s),沒有完成儲存磁碟的動作,將會得到這個狀態。 * `FLUSH_SLAVE_TIMEOUT`:同步“從”超時,當broker的角色設定為“同步主”時,但是在設定的同步時間內,預設為5s,沒有完成主從之間的同步,就會得到這個狀態。 * `SLAVE_NOT_AVAILABLE`:“從”不可用,當我們設定“同步主”,但是沒有配置“從”broker時,會返回這個狀態。 * `SEND_OK`:訊息傳送成功。 再來看看訊息重複與訊息丟失,當你發現你的訊息丟失時,通常有兩個選擇,一個是丟就丟吧,這樣訊息就真的丟了;另一個選擇是訊息重新發送,這樣有可能引起訊息重複。通常情況下,還是推薦重新發送的,我們在消費訊息的時候要去除掉重複的訊息。 傳送message的大小一般不超過512k,預設的傳送訊息的方式是同步的,傳送方法會一直阻塞,直到等到返回的響應。如果你比較在意效能,也可以用`send(msg, callback)`非同步的方式傳送訊息。 ## 消費者 多個消費者可以組成**消費者組(consumer group)**,不同的**消費者組**可以訂閱相同的Topic,也可以獨立的消費Topic,每一個消費者組都有自己的消費偏移量。 訊息的消費方式一般有兩種,順序消費和併發消費。 * 順序消費:消費者將鎖住訊息佇列,確保訊息按照順序一個一個的被消費掉,順序消費會引起一部分效能損失。在消費訊息的時候,如果出現異常,不建議直接丟擲,而是應該返回`SUSPEND_CURRENT_QUEUE_A_MOMENT `這個狀態,它將告訴消費者過一段時間後,會重新消費這個訊息。 * 併發消費:消費者將併發的消費訊息,這種方式的效能非常好,也是推薦的消費方式。在消費的過程中,如果出現異常,不建議直接丟擲,而是返回`RECONSUME_LATER `狀態,它告訴消費者現在不能正確的消費它,過一段時間後,會再次消費它。 在消費者內部,是使用`ThreadPoolExecutor`作為執行緒池的,我們可以通過`setConsumeThreadMin `和`setConsumeThreadMax `設定最小消費執行緒和最大消費執行緒。 當一個新的消費者組建立以後,它要決定是否消費之前的歷史訊息,`CONSUME_FROM_LAST_OFFSET`將忽略歷史訊息,消費新的訊息。`CONSUME_FROM_FIRST_OFFSET`將消費佇列中的每一個訊息,之前的歷史訊息也會再消費一遍。`CONSUME_FROM_TIMESTAMP`可以指定消費訊息的時間,指定時間以後的訊息會被消費。 如果你的應用不能容忍重複消費,那麼在消費訊息的過程中,要做好訊息的校驗。 好了,今天就到這裡吧,下一篇我們將介紹RocketMQ的環境