1. 程式人生 > >RabbitMQ消息隊列(一): Detailed Introduction 詳細介紹

RabbitMQ消息隊列(一): Detailed Introduction 詳細介紹

image 用戶 sco ice eject 企業 取數 pro ket

原文地址:https://blog.csdn.net/anzhsoft/article/details/19563091

1 歷史

RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現。AMQP 的出現其實也是應了廣大人民群眾的需求,雖然在同步消息通訊的世界裏有很多公開標準(如 COBAR的 IIOP ,或者是 SOAP 等),但是在異步消息處理中卻不是這樣,只有大企業有一些商業實現(如微軟的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等聯合制定了 AMQP 的公開標準。

RabbitMQ是由RabbitMQ Technologies Ltd開發並且提供商業支持的。該公司在2010年4月被SpringSource(VMWare的一個部門)收購。在2013年5月被並入Pivotal。其實VMWare,Pivotal和EMC本質上是一家的。不同的是VMWare是獨立上市子公司,而Pivotal是整合了EMC的某些資源,現在並沒有上市。

RabbitMQ的官網是http://www.rabbitmq.com

2 應用場景

言歸正傳。RabbitMQ,或者說AMQP解決了什麽問題,或者說它的應用場景是什麽?

對於一個大型的軟件系統來說,它會有很多的組件或者說模塊或者說子系統或者(subsystem or Component or submodule)。那麽這些模塊的如何通信?這和傳統的IPC有很大的區別。傳統的IPC很多都是在單一系統上的,模塊耦合性很大,不適合擴展(Scalability);如果使用socket那麽不同的模塊的確可以部署到不同的機器上,但是還是有很多問題需要解決。比如:

1)信息的發送者和接收者如何維持這個連接,如果一方的連接中斷,這期間的數據如何方式丟失?

2)如何降低發送者和接收者的耦合度?

3)如何讓Priority高的接收者先接到數據?

4)如何做到load balance?有效均衡接收者的負載?

5)如何有效的將數據發送到相關的接收者?也就是說將接收者subscribe 不同的數據,如何做有效的filter。

6)如何做到可擴展,甚至將這個通信模塊發到cluster上?

7)如何保證接收者接收到了完整,正確的數據?

AMDQ協議解決了以上的問題,而RabbitMQ實現了AMQP

3 系統架構

成為系統架構可能不太合適,可能叫應用場景的系統架構更合適。

技術分享圖片

RabbitMQ Server: 也叫broker server,它不是運送食物的卡車,而是一種傳輸服務。原話是RabbitMQ isn’t a food truck, it’s a delivery service. 他的角色就是維護一條從Producer到Consumer的路線,保證數據能夠按照指定的方式進行傳輸。但是這個保證也不是100%的保證,但是對於普通的應用來說這已經足夠了。當然對於商業系統來說,可以再做一層數據一致性的guard(保護),就可以徹底保證系統的一致性了。

Client A & B: 也叫Producer,數據的發送方。createmessages and publish (send) them to a broker server (RabbitMQ).一個Message有兩個部分:payload(有效載荷)和label(標簽)。payload顧名思義就是傳輸的數據。label是exchange的名字或者說是一個tag,它描述了payload,而且RabbitMQ也是通過這個label來決定把這個Message發給哪個Consumer。AMQP僅僅描述了label,而RabbitMQ決定了如何使用這個label的規則。

 Client 1,2,3:也叫Consumer,數據的接收方。Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。把queue比作是一個有名字的郵箱。當有Message到達某個郵箱後,RabbitMQ把它發送給它的某個訂閱者即Consumer。當然可能會把同一個Message發送給很多的Consumer。在這個Message中,只有payload,label已經被刪掉了。對於Consumer來說,它是不知道誰發送的這個信息的。就是協議本身不支持。但是當然了如果Producer發送的payload包含了Producer的信息就另當別論了。

對於一個數據從Producer到Consumer的正確傳遞,還有三個概念需要明確:exchanges, queues and bindings。

Exchanges are where producers publish their messages.

  exchanges是生產者發布消息的地方

Queuesare where the messages end up and are received by consumers

  queues是消息最終被消費者接收的地方

Bindings are how the messages get routed from the exchange to particular queues.

  bindings是消息如果從exchange路由到特定queues的方式

  Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。以後我們可以看到,程序的起始處就是建立這個TCP連接。

Channels: 虛擬連接。它建立在上述的TCP連接中。數據流動都是在Channel中進行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。

那麽,為什麽使用Channel,而不是直接使用TCP連接?

對於OS來說,建立和關閉TCP連接是有代價的,頻繁的建立關閉TCP連接對於系統的性能有很大的影響,而且TCP的連接數也有限制,這也限制了系統處理高並發的能力。但是,在TCP連接中建立Channel是沒有上述代價的。對於Producer或者Consumer來說,可以並發的使用多個Channel進行Publish或者Receive。有實驗表明,1s的數據可以Publish10K的數據包。當然對於不同的硬件環境,不同的數據包大小這個數據肯定不一樣,但是我只想說明,對於普通的Consumer或者Producer來說,這已經足夠了。如果不夠用,你考慮的應該是如何細化split你的設計。

4. 進一步的細節闡明

4.1 使用ack確認Message的正確傳遞

1) 默認情況下,如果Message 已經被某個Consumer正確的接收到了,那麽該Message就會被從queue中移除。當然也可以讓同一個Message發送到很多的Consumer。

2)如果一個queue沒被任何的Consumer Subscribe(訂閱),那麽,如果這個queue有數據到達,那麽這個數據會被cache,不會被丟棄。當有Consumer時,這個數據會被立即發送到這個Consumer,這個數據被Consumer正確收到時,這個數據就被從queue中刪除

那麽什麽是正確收到呢?通過ack。每個Message都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數據沒有被ack,那麽:

RabbitMQ Server會把這個信息發送到下一個Consumer。

如果這個app有bug,忘記了ack,那麽RabbitMQ Server不會再發送數據給它,因為Server認為這個Consumer處理能力有限。

而且ack的機制可以起到限流的作用(Benefitto throttling):在Consumer處理完成數據後發送ack,甚至在額外的延時後發送ack,將有效的balance Consumer的load。

當然對於實際的例子,比如我們可能會對某些數據進行merge,比如merge 4s內的數據,然後sleep 4s後再獲取數據。特別是在監聽系統的state,我們不希望所有的state實時的傳遞上去,而是希望有一定的延時。這樣可以減少某些IO,而且終端用戶也不會感覺到。

4.2 Reject a message

有兩種方式,第一種的Reject可以讓RabbitMQ Server將該Message 發送到下一個Consumer。第二種是從queue中立即刪除該Message。

4.3 Creating a queue

Consumer和Procuder都可以通過 queue.declare 創建queue。對於某個Channel來說,Consumer不能declare一個queue,卻訂閱其他的queue。當然也可以創建私有的queue。這樣只有app本身才可以使用這個queue。queue也可以自動刪除,被標為auto-delete的queue在最後一個Consumer unsubscribe後就會被自動刪除。那麽如果是創建一個已經存在的queue呢?那麽不會有任何的影響。需要註意的是沒有任何的影響,也就是說第二次創建如果參數和第一次不一樣,那麽該操作雖然成功,但是queue的屬性並不會被修改。

那麽誰應該負責創建這個queue呢?是Consumer,還是Producer?

如果queue不存在,當然Consumer不會得到任何的Message。但是如果queue不存在,那麽Producer Publish的Message會被丟棄。所以,還是為了數據不丟失,Consumer和Producer都try to create the queue!反正不管怎麽樣,這個接口都不會出問題。

queue對load balance的處理是完美的。對於多個Consumer來說,RabbitMQ 使用循環的方式(round-robin)的方式均衡的發送給不同的Consumer。

4.4 Exchanges

從架構圖可以看出,Procuder Publish的Message進入了Exchange。接著通過“routing keys”, RabbitMQ會找到應該把這個Message放到哪個queue裏。queue也是通過這個routing keys來做的綁定。

有三種類型的Exchanges:direct, fanout,topic。 每個實現了不同的路由算法(routing algorithm)。

· Direct exchange: 如果 routing key 匹配, 那麽Message就會被傳遞到相應的queue中。其實在queue創建時,它會自動的以queue的名字作為routing key來綁定那個exchange。

· Fanout exchange: 會向響應的queue廣播。

· Topic exchange: 對key進行模式匹配,比如ab*可以傳遞到所有ab*的queue。

4.5 Virtual hosts

每個virtual host本質上都是一個RabbitMQ Server,擁有它自己的queue,exchagne,和bings rule等等。這保證了你可以在多個不同的application中使用RabbitMQ。

接下來我會使用Python來說明RabbitMQ的使用方法。

RabbitMQ消息隊列(一): Detailed Introduction 詳細介紹