1. 程式人生 > >三、RabbitMQ如何實現AMQ協議(讀書筆記)

三、RabbitMQ如何實現AMQ協議(讀書筆記)

開發十年,就只剩下這套架構體系了! >>>   

AMQP作為一種RPC傳輸機制

RabbitMQ作為一種AMPQ代理伺服器,提供了一套嚴格的通訊方式,核心部分的通訊幾乎都使用了RPC(遠端過程呼叫)模式。

啟動會話

AMQP協議定義,當客戶端要與RabbitMQ互動時,首先要向RabbitMQ傳送一個協議頭(protocol header):

要完全連線到RabbitMQ,會經過由三個同步RPC請求所組成請求序列,這三個RPC請求分別是啟動、調整和開啟連線。對於應用RabbitMQ而言(不準備開發RabbitMQ Client),啟動會話這個階段並不是非常重要,稍作理解就好。

設定通道

AMQP協議定義,通道要使用AMQP連線作為互相傳輸資訊的渠道,而且要將傳輸過程與其他正在進行中的會話隔離開。一個AMQP連線可以有多個通道,允許客戶端和RabbitMQ之間進行多次會話,技術上稱之為多路複用(multiplexing)。

不要使用過多的通道! 在傳輸過程中,通道只是分配給客戶端和RabbitMQ之間所傳遞訊息的一個整數值。但在客戶端和RabbitMQ中,會為每個通道設定記憶體結構和物件,連線中的通道越多,用於管理該連線的訊息流所需的記憶體也就越多。

AMQP RPC幀

AMQP使用類和方法在客戶端和RabbitMQ之間建立公共語言,這些類和方法被稱為AMQP命令(AMQP Commands)。AMQP中的類定義了一個功能範圍,每個類都包含執行不同任務的方法。

例如:Connection.Start命令,由兩部分組成:AMQP類(Class)和方法(Method)。

當使用命令與RabbitMQ進行互動時,執行這些命令,和所需要的所有引數,都被封裝在一種資料結構中,並且對這個資料結構進行編碼,以便傳輸。

幀由五個部分組成:

  1. 幀型別;
  2. 通道編號;
  3. 幀大小(以位元組為單位);
  4. 有效載荷;
  5. 結束位元組標記(ASCII值206);

幀型別

幀型別分為五種:

  1. 協議頭幀:用於連結到RabbitMQ,僅在連結時使用一次;
  2. 方法幀:用於攜帶傳送給RabbitMQ或從RabbitMQ接收到的RPC請求和響應;
  3. 內容頭幀:包含一條訊息的大小和屬性;
  4. 訊息體幀:包含訊息的內容;
  5. 心跳幀:客戶端與RabbitMQ之間進行傳遞,作為一種校驗機制,確保連線的兩端都可用;

可以在rabbitmq.config配置檔案中設定心跳間隔,設定為0則代表關閉心跳檢測機制(如果你的程式在一定程度上阻塞了通訊,使得心跳檢測難以正常運作)。

幀的編組

如果要向RabbitMQ傳送訊息,則會對方法幀、內容頭幀和訊息幀進行編組。

傳送的第一個幀是方法幀,它攜帶了命令和執行它所需的引數(如交換器和路由鍵)。

在方法幀之後的是內容頭幀,它包含了訊息屬性以及訊息體大小,AMQP的幀大小是有上限的,預設為131KB,如果訊息體超過了這個上限,訊息內容將被拆分成多個訊息體幀。

接著是訊息幀,它攜帶了具體的訊息內容。

為了更高效的處理這些幀,方法幀和內容頭幀會被打包成二進位制資料,訊息幀則麼有進行任何打包或編碼。

雖然幀預設大小是131KB,但客戶端在連結過程中可以設定更大或更小的幀,其最大可達一個32位值。

方法幀結構

方法幀攜帶構建RPC請求所需的類、方法以及相關引數。

這些屬性告知RabbitMQ如何路由訊息。Mandatory標識則告知RabbitMQ訊息必須投遞成功,否則釋出訊息的過程就應該是失敗的。

通常,使用Basic.Publish RPC請求傳送訊息是一個單向會話。但是,如果你在釋出訊息時使用了mandatory標誌,則應用程式應該監聽從RabbitMQ傳送回來的Basic.Return命令。如果RabbitMQ不能滿足mandatory標誌設定的要求,它將在同一個通道上傳送一個Basic.Return命令到客戶端。

內容頭幀結構

內容頭幀除了告知RabbitMQ該訊息的大小之外,還包含訊息的各種屬性。這些屬性儲存在Basic.Properties對映表中,可包含描述訊息內容的資料,也可完全是空白的。大多數客戶端會預先填充一小部分欄位,比如內容型別和投遞模式。

屬性是編寫訊息的強大工具,他們可以用來在釋出者和消費則之間就訊息的內容建立契約,從而允許對訊息進行大量的定製操作。

訊息體幀結構

訊息題幀包含了實際訊息資料的結構,可以是圖片的二進位制資料、序列化後的JSON、XML或字串等等。

使用協議

在將訊息釋出到佇列前,有幾個與配置相關的步驟,至少需要設定交換器和佇列,然後將他們繫結到一起,下面會從協議級別來看看進行這麼步驟所發生的內部過程。

宣告交換器

協議中,建立交換器會使用Exchange.Declare命令,該命令提供了定義交換器名稱和型別的引數,以及用於訊息處理的其他元資料。

一旦命令被髮出,RabbitMQ在建立了交換器之後將傳送一個Exchange.DeclareOk的方法幀作為相應。如果處於某些原因建立失敗了,則RabbitMQ將使用Channel.Close命令關閉傳送Channel.Declare命令的通道。該響應包含一個數字編碼和文字值,用於說明Exchange.Declare失敗並關閉通道的原因。

宣告佇列

協議中,建立佇列會使用Queue.Declare命令,和建立交換器的過程是一樣的。

在宣告佇列時,多次傳送同一個Queue.Declare命令並不會有任何副作用。

當嘗試宣告一個與現有佇列同名的新佇列時,如果新佇列的屬性與現有佇列不一樣,那麼RabbitMQ將關閉發出RPC請求的通道。 要正確的處理錯誤,一般是由客戶端的實現來監聽來自RabbitMQ的Channel.Close命令,以便能做出正確響應。 某些客戶端實現可以讓你返回異常,讓你的應用程式去捕獲並處理。還有的客戶端提供回撥風格,通過註冊一個回撥方法來處理。

繫結佇列到交換器

協議中,繫結交換器與佇列的命令是Queue.Bind,每次只能指定一個佇列。這一步和建立交換器或建立佇列是一樣的。

釋出訊息

協議中,當釋出訊息到RabbitMQ時,多個幀封裝了傳送到伺服器的訊息資料。

在實際的訊息內容到達RabbitMQ之前,客戶端應用程式傳送一個Basic.Publish方法幀,一個內容頭幀和至少一個訊息體幀。

當RabbitMQ接收到一個訊息的所有幀並確定下一步操作之前,它將檢查方法幀以獲取它所需要的資訊。Basic.Publish方法幀攜帶訊息的交換器名稱和路由鍵。在評估這些資料時,RabbitMQ會嘗試將Basic.Publish幀中的交換器名稱與配置交換器的資料庫進行匹配。

預設情況下,如果使用RabbitMQ配置中不存在交換器進行訊息釋出,RabbitMQ將自動丟棄該訊息。想要確保訊息成功投遞,需要在釋出時將mandatory標識設定為true,或者使用投遞確認機制。

當RabbitMQ發現某一個交換器與Basic.Properties方法幀中的交換器名稱相匹配時,它將判斷該交換器中的繫結資訊,並通過路由鍵尋找匹配的佇列。當訊息與任一繫結的佇列符合匹配標準時,RabbitMQ伺服器將以FIFO的順序將訊息放入佇列中。

放入佇列資料結構中的並不是實際訊息,而是訊息的引用。當RabbitMQ準備投遞訊息時,它將使用這個引用來編組訊息並通過網路進行傳送。這為釋出到多個佇列的訊息提供了實質性的優化。

一旦不再需要這個訊息,實際訊息資料將會從RabbitMQ的記憶體中移除。

預設情況下,只要沒有消費者正在監聽佇列,訊息就會被儲存在佇列中。RabbitMQ可以將這些訊息儲存到記憶體或磁碟,具體取決於Basic.Properties中指定的delivery-mode屬性。

消費訊息

要消費RabbitMQ佇列中的訊息,消費者應用程式發出Basic.Consume命令來訂閱RabbitMQ中的佇列。伺服器將返回Basic.ConsumerOk來響應。然後開始向消費者投遞訊息。

如果要讓消費者停止接收訊息,則可以傳送Basic.Cancel命令。這個命令是非同步發出的,而RabbitMQ可能仍然在傳送訊息,所以消費者在接收到一個Bsaic.CancelOk響應之前,仍然可以接收到來自RabbitMQ的訊息。

消費訊息時,有幾個設定可以讓RabbitMQ知道你要如何接收它們。其中一個設定是Basic.Consume命令中的no_ack引數。當設定為ture時,RabbitMQ將連續傳送訊息直到Basic.Cancel或者連線斷開。如果no_ack標記為false,則消費者必須通過傳送Basic.Ack命令來確認收到的每條訊息。

當傳送Basic.Ack時,消費者必須在Basic.Deliver方法幀中傳遞一個名為delivery tag(投遞標籤)的引數。RabbitMQ使用投遞標籤和通道作為唯一識別符號來實現訊息確認