RabbitMQ(二):理解訊息通訊RabbitMQ
一、消費者、生產者和通道
生產者(producer):生產者建立訊息,然後釋出(傳送)到代理伺服器(RabbitMQ),可以說傳送訊息的程式就是生產者。什麼是訊息?訊息包含兩部分:有效載荷和標籤。有效載荷就是傳輸的資料,可以是任何內容,包括json資料和圖片等等。而標籤(一個叫交換器名稱和可選的主題標記)描述了有效載荷,RabbitMQ用它來決定誰將獲得這個訊息。
消費者(consumer):消費者就是接收訊息並處理訊息的程式,他們連線到代理伺服器上,並訂閱到佇列上。當消費者接收訊息時,它只是得到訊息的有效載荷。整個過程很簡單:生產者建立訊息,消費者接收訊息。你的應用程式可以作為生產者也可以作為消費者,在兩者之間切換。但是訊息的傳輸必定會通過某一介質傳遞,此處的訊息就通過通道傳遞。
通道(channel):不管是釋出訊息、訂閱佇列還是接收訊息,這些動作都是通過通道完成。應用程式和Rabbit代理伺服器之間會建立一條TCP連線,TCP連線就像電纜,通道相當於電纜中的光束。在一條TCP連線上建立多少條通道是沒有限制的,所以不會對作業系統的TCP棧造成額外的負擔。
二、佇列、交換器和繫結
Rabbit的訊息路由分為三部分:交換器、佇列和繫結。生產者把訊息釋出到交換器上;訊息最終到達佇列,並被消費者接收;繫結決定了訊息如何從路由器路由到特定的佇列。
佇列(queue):佇列是一個棧先進先出,為生產者釋出的訊息提供了儲存的處所,訊息在此等待消費。本質上佇列可以儲存無限的訊息,但是需要視系統記憶體而定。
繫結(binding):佇列通過路由鍵繫結到交換器,路由鍵就是訊息通過交換器投遞到那個佇列的規則。
交換器(exchange):交換器有四種類型:direct、fanout、topic和headers。一個Exchange可以和多個Queue進行繫結,producer在傳遞訊息的時候,會傳遞一個路由鍵,Exchange會根據這個路由鍵按照特定的路由演算法,將訊息路由給指定的queue。
(1)direct:如果路由鍵匹配的話,訊息就被投遞到對應的佇列。類似於單播。
(2)fanout:將訊息廣播到繫結的佇列上,不管路由鍵是什麼,繫結的佇列都會收到訊息。
(3)topic:類似與組播,和正則表示式類似,傳送給路由鍵符合一定規則的佇列。如:路由鍵為user.stock的訊息會轉發給繫結匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的佇列(* 表是匹配一個任意片語,#表示匹配0個或多個片語)。
(4)headers:不通過路由鍵匹配而是通過訊息的header匹配,其他與direct交換器一致,但是效能上會差很多。
三、vhost
每一個RabbitMQ伺服器都能建立虛擬訊息伺服器,我們稱之為虛擬主機(vhost)。每一個vhost本質上是一個mini版的RabbitMQ伺服器,擁有自己的佇列、交換器和繫結,最重要的是擁有自己的許可權機制。邏輯上vhost之間是相互獨立的分離的,保證了安全性和可移植性。RabbitMQ包含了開箱即用的預設的vhost:"/",因此使用起來非常簡單。當在RabbitMQ叢集中建立vhost時,整個叢集上都會建立該vhost,避免了vhost的重複建立。
四、第一次嘗試訊息通訊
交換器或佇列的時候,如果交換器或佇列已經存在,則直接返回結束,不會重複建立。無法承擔訊息丟失,則生產者和消費者中都需要嘗試去建立交換器和佇列,如果可以承擔則可以由消費者來宣告佇列。我們建立生產者和消費者兩個控制檯程式分別執行以下程式碼。
生產者:
static void Main(string[] args) { FirstProducer(); } /// <summary> /// 第一個生產者 /// </summary> private static void FirstProducer() { //1.連線到伺服器 var conn_factory = new ConnectionFactory() { HostName = "localhost",UserName="guest",Password="guest",Port=5672//預設埠5672 }; using (IConnection conn = conn_factory.CreateConnection()) { //2.建立通道 using (IModel channel = conn.CreateModel()) { //3.宣告交換器 channel.ExchangeDeclare( "HelloExchange",//交換器名稱 ExchangeType.Direct,//交換器型別 true,//是否持久話 false,//是否自動刪除 null//關於交換器的詳細設定,鍵值對形式 ); //4.宣告佇列 channel.QueueDeclare( "HelloQueue",//佇列名稱 false,//是否持久化 false,//是否只對首次宣告的佇列可見 false,//是否自動刪除 null////關於佇列和佇列內訊息的詳細設定,鍵值對形式 ); //5.繫結交換器和佇列 channel.QueueBind( "HelloQueue",//佇列名 "HelloExchange", //交換器名 "hola"//路由鍵 ); //6.釋出訊息 string msg_str = "這是生產者第一次釋出的訊息"; IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//釋出的資料型別 for(int i = 0; i < 5; i++) { channel.BasicPublish( "HelloExchange",//訊息傳送目標交換器名稱 "hola",//路由鍵 msg_pro,//訊息的釋出屬性 Encoding.UTF8.GetBytes(msg_str)//訊息 ); } } } }
消費者:
static void Main(string[] args) { FirstCousmer(); } /// <summary> /// 第一個消費者 /// </summary> private static void FirstCousmer() { //1.連結到伺服器 var conn_factory = new ConnectionFactory() { HostName = "localhost",UserName = "guest",Password = "guest",Port = 5672 }; using (var conn = conn_factory.CreateConnection()) { //2.建立通道 using(IModel channel = conn.CreateModel()) { //3.宣告交換器 channel.ExchangeDeclare( "HelloExchange",//交換器名稱 ExchangeType.Direct,//交換器型別 true,//是否持久話 false,//是否自動刪除 null//關於交換器的詳細設定,鍵值對形式 ); //4.宣告佇列 channel.QueueDeclare( "HelloQueue",//佇列名稱 false,//是否持久化 false,//是否只對首次宣告的佇列可見 false,//是否自動刪除 null////關於佇列和佇列內訊息的詳細設定,鍵值對形式 ); //5.繫結交換器和佇列 channel.QueueBind( "HelloQueue",//佇列名 "HelloExchange", //交換器名 "hola"//路由鍵 ); //6.獲取訊息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (ch, ea) =>//消費者訊息接收處理事件 { var body = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(body); channel.BasicAck(ea.DeliveryTag, false); //確認接收訊息,從佇列中刪除 }; //7.啟動消費者 string consumer_tag = channel.BasicConsume( "HelloQueue", //獲取的佇列名稱 false,//是否自動確認接收訊息,從佇列中刪除 consumer//消費者物件 ); channel.BasicCancel(consumer_tag);//呼叫消費 //var consumer = new QueueingBasicConsumer(channel); //channel.BasicConsume("HelloQueue", false, consumer); //while (true) //{ //var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); //var body = ea.Body; //var message = Encoding.UTF8.GetString(body); //Console.WriteLine("Received {0}", message); //channel.BasicAck(ea.DeliveryTag, false); //} } } Console.ReadLine(); }
執行消費者後可以看到,以下結果: