RabbitMQ基礎學習筆記(C#代碼示例)
一、定義:
MQ是MessageQueue,消息隊列的簡稱(是流行的開源消息隊列系統,利用erlang語言開發)。MQ是一種應用程序對應用程序的通信方法。應用程序通過讀寫入隊和出隊的消息來通信,無需專用連接來鏈接它們。
消息傳遞是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,一般應用於遠程過程調用的技術。
排隊指的是應用程序通過隊列來通信。應用隊列避免接收和發送數據的同時進行。
二、特點:
MQ是消費者-生產者模型的代表。一端往消息隊列中寫入消息,另一端可以讀取或者訂閱隊列中的消息。MQ遵循的是AMQP協議(高級消息隊列協議:使得遵從該規範的客戶端應用和消息中間件服務器的全功能互操作成為可能)的具體實現和產品。
三、應用:
在使用MQ時,我們不需要實時的返回信息。獲取信息和返回信息進行異步處理。例如:在項目中,我們需要從汽車系統中利用CAN總線實時的獲取汽車的相關信息,但是沒有必要給汽車返回信息。如,獲取汽車的輪胎氣壓,但是我們不需要給汽車一個返回的信息或結果。
C#項目要利用RabbitMQ來獲取實時數據的話,需要先安裝客戶端的庫文件:RabbitMQ.Client.dll,下載地址如下:
http://download.csdn.net/detail/qq_30507287/9599941
四、RabbitMQ的結構圖:
五、基本概念:
Broker:消息隊列服務器實體。
Exchange:消息交換機,指定消息按照什麽規則,路由到哪個隊列。可以理解成具有路由表的程序。每個消息都有一個成為路由鍵(routing key)的屬性。交換機中有一系列的綁定(binding)即路由規則。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。消息一直在裏面,直到有客戶端(消費者)連接到這個隊列並且將其取走為止。隊列是有消費者通過程序建立的。
Binding:綁定,作用:把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
Vhost(virtualhost):虛擬主機,一個broker裏可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接裏,可建立多個channel,每個channel代表一個會話任務。
註1:比較重要的四個:vhost,exchange,queue,binding。一個虛擬主機持有一組交換機、隊列和綁定。
註2:消費者程序要負責創建交換機們(不止一個)?因為每個交換機在自己獨立的進程當中執行,增加多個交換機就是增加多個進程,可以充分利用服務器上的CPU的核來提高效率。(一個8核服務上,可以用5核來創建5個交換機,剩余的3個用來處理消息。)
註3:一個綁定(binding)就是一個基於路由鍵將交換機和隊列連接起來的路由規則。
六、消息隊列的使用過程大概如下:
(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routingkey,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。
說明:exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。
七、Exchange(交換機的類型)的類型:
1)Direct交換機:
(處理路由鍵)完全根據key進行投遞。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。
綁定時設置了routing key為“abc”,那麽客戶端提交的消息,只有設置了key為“abc”的才會投遞到隊列。
2)Topic交換機:
(將路由鍵和某模式進行匹配)此時隊列需要綁定到一個模式上,對key進行模式匹配後進行投遞。
“#”匹配一個或多個詞,“*”匹配正好一個詞。“abc.#”匹配“abc.def.ghi”,“abc.*”只匹配“abc.def”。
3)Fanout交換機:(不處理路由鍵)不需要key值,采用廣播模式,消息進來時,投遞到與該交換機綁定的所有隊列。
八、隊列消息的持久化:
1、為什麽會有持久化?
花費大量時間來創建隊列,交換機和綁定,如果服務器出現意外或外界故障,那麽隊列,交換機和綁定就會清空。RabbitMQ重啟之後就會清空原來的東西。因此在創建隊列和交換機時就會指定一個標誌durable來控制。當然,durable表示的含義:含有該標誌的隊列和交換機在重啟之後會重新建立,而不是,在隊列中的消息會重啟後恢復。
2、消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1;
(2)queue持久化,在聲明時指定durable => 1;
(3)消息持久化,在投遞時指定delivery_mode=> 2(1是非持久化,2表示persistent,持久化);
如果exchange和queue都是持久化的,那麽它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。
說明:delivery mode(投遞模式)
3、綁定(binding)的持久化:
綁定在創建的時候無法設置durable,那麽綁定的持久化是靠隊列和交換機來實現的。如果綁定一個durable的隊列和一個durable的交換機,RabbitMQ會自動保留這個綁定。只要隊列和交換機之一不是durable,那麽依賴它們的綁定就會自動刪除。
九、.NET/C#客戶端中的RabbitMQ
(1)主要的命名空間,接口和類
核心的API接口和類都定義在命名空間為RabbitMQ.Client中。
using RabbitMQ.Client;
核心的接口和類是:
IModel: representsan AMQP 0-9-1 channel, and provides most of the operations (protocol methods).代表通道,提供協議方法。
IConnection:represents an AMQP 0-9-1 connection
ConnectionFactory:constructs IConnection instances
IBasicConsumer:represents a message consumer表示消息的消費者。
其他的接口和類:
DefaultBasicConsumer:commonly used base class for consumers
其他公共的RabbitMQ.Client命名空間還包括:
RabbitMQ.Client.Events:various events and event handlers that are part of the client library,including EventingBasicConsumer, a consumer implementation built around C#event handlers.
RabbitMQ.Client.Exceptions:exceptions visible to the user.
(2)連接代理
連接到RabbitMQ,必須要實例化一個ConnectionFactory和configure一個RabbitMQ到主機,虛擬機或認證的設備上。下面的代碼是在主機上連接RabbitMQ結點。
ConnectionFactory factory = newConnectionFactory();
factory.Uri = "amqp://user:[email protected]:port/vhost";
IConnection conn = factory.CreateConnection();
然後用iconnection接口可以用來打開一個通道:
IModel channel = conn.CreateModel();
(3)使用交換機和隊列
model.ExchangeDeclare(exchangeName,ExchangeType.Direct);
model.QueueDeclare(queueName,false,false,false,null);
model.QueueBind(queueName, exchangeName, routingKey,null);
(4)發布消息
利用交換機(exchange)發送消息,使用的是IModel.BasicPublish
byte[]messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
model.BasicPublish(exchangeName, routingKey,null,messageBodyBytes);
對於細節,可以用重載變量來指定標誌或特殊的消息屬性:
byte[]messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
model.BasicPublish(exchangeName,routingKey, props, messageBodyBytes);
發送自定義的消息:
byte[]messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain";
props.DeliveryMode = 2;
props.Headers = new Dictionary<string,object>();
props.Headers.Add("latitude", 51.5252949);
props.Headers.Add("longitude",-0.0905493);
model.BasicPublish(exchangeName, routingKey, props, messageBodyBytes);
十、(C#實例解析)生產者應用程序解析
(1)從生產者應用程序來看,建立一個使用默認設置的連接,創建連接並創建一個通道:
namespaceProducer { classProgram { staticvoidMain(string[] args) { var connectionFactory =newConnectionFactory(); IConnection connection =connectionFactory.CreateConnection(); IModel channel = connection.CreateModel(); } } }
(2)聲明一個交換機並發布消息
channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct);
第二個參數表名了參數的類型,可以選擇direct,fanout,topic,或者headers。
(3)調用方法,產生一個返回值,本例中調用的是DoSomethingInteresting(),並且返回一個字符串的值。
string value = DoSomethingInteresting();
(4)dosomethinginteresting()實施可以返回一個新的GUID字符串值
staticstringDoSomethingInteresting()
{
returnGuid.NewGuid().ToString();
}
(5)利用返回值來創建一個日誌消息:
string logMessage =string.Format("{0}: {1}", TraceEventType.Information, value);
(6)將日誌消息轉換為字節數組,並將消息發布到新的交換機上:
byte[]message =Encoding.UTF8.GetBytes(logMessage);
channel.BasicPublish("direct-exchange-example","",null,message);
(7)最後,要關閉通道和連接
channel.Close();
connection.Close();
十一、(C#實例解析)消費者應用程序解析
(1)和生成者一樣創建消費者
using RabbitMQ.Client; namespace Consumer { classProgram { staticvoidMain(string[] args) { varconnectionFactory = new ConnectionFactory(); IConnection connection = connectionFactory.CreateConnection(); IModel channel = connection.CreateModel(); channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct); } } }
(2)聲明一個隊列去綁定交換機,隊列的名字為“logs”
channel.QueueDeclare("logs",false,false,true,null);
(3)綁定“logs”隊列,利用QueueBind()
channel.QueueBind("logs","direct-exchange-example","");
(4)聲明消費者對象
var consumer =newQueueingBasicConsumer(channel)
(5)推送消息
channel.BasicConsume(“logs”, true,consumer);
(6)任何消息都將被自動檢索,並放置在內存隊列的本地內存隊列中。
var eventArgs =(BasicDeliverEventArgs)consumer.Queue.Dequeue();
將eventArgs轉換成字符串並打印輸出
var message =Encoding.UTF8.GetString(eventArgs.Body);
Console.WriteLine(message);
(7)關閉通道和連接
channel.Close();
connection.Close();
十二、(C#實例代碼)生產-消費模式的完整代碼
(1)
using System; using System.Diagnostics; using System.Text; using System.Threading; using RabbitMQ.Client; namespace Producer { classProgram { staticvoidMain(string[] args) { Thread.Sleep(1000); varconnectionFactory = new ConnectionFactory(); IConnection connection = connectionFactory.CreateConnection(); IModel channel = connection.CreateModel(); channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct); stringvalue = DoSomethingInteresting(); stringlogMessage = string.Format("{0}:{1}",TraceEventType.Information,value); byte[]message = Encoding.UTF8.GetBytes(logMessage); channel.BasicPublish("direct-exchange-example","",null,message); channel.Close(); connection.Close(); } staticstringDoSomethingInteresting() { returnGuid.NewGuid().ToString(); } } }
(2)
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Consumer { classProgram { staticvoidMain(string[] args) { varconnectionFactory = new ConnectionFactory(); IConnection connection = connectionFactory.CreateConnection(); IModel channel = connection.CreateModel(); channel.ExchangeDeclare("direct-exchange-example",ExchangeType.Direct); channel.QueueDeclare("logs",false,false,true,null); channel.QueueBind("logs","direct-exchange-example",""); varconsumer = new QueueingBasicConsumer(channel); channel.BasicConsume("logs",true,consumer); vareventArgs = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); stringmessage = Encoding.UTF8.GetString(eventArgs.Body); Console.WriteLine(message); channel.Close(); connection.Close(); Console.ReadLine(); } } }
轉載:
http://blog.csdn.net/qq_30507287/article/details/52176603
RabbitMQ基礎學習筆記(C#代碼示例)