Windows下當地RabbitMQ服務的安裝
Windows下本地RabbitMQ服務的安裝
本文參考:劉若澤相關技術文檔
當然這些內容頁可以通過RabbitMQ官方網站獲得。
RabbitMQ配置說明手冊
一、RaibbitMQ服務器配置
1. 準備工作。如果之前安裝過RabbitMQ軟件,若想重新安裝,必須先把之前的RabbitMQ相關軟件卸載。
2. 安裝ERLANG語言包。首先到http://www.erlang.org/download.html這個頁面下載 Erlang Windows Binary File並且運行。這個過程大約5分鐘左右。
安裝具體過程:
1.雙擊otp_win32_R16801.exe(不同版本可能命名字不一樣),選擇next
2.默認安裝在C盤,建議程序安裝在非系統盤比如D盤(如果安裝在C盤可能會出現一些權限問題),修改好安裝路徑後,選next:
3.進入安裝程序,選擇install,即可完成安裝:
3. 安裝RabbitMQ服務器軟件。到這個頁面下載:
http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.3/rabbitmq-server-3.1.3.exe。然後運行安裝。
安裝具體過程:
1. 雙擊rabbitmq-server-3.1.1.exe。選擇next:
2. 默認安裝在C盤,直接安裝即可:
3. 大約兩份鐘即可完成安裝
4. 如果想要Rabbitmq-sever能在windows下命令行下運行,還需要配置環境變量:
配置如下:先選擇高級系統設置會彈出系統設置,再在系統設置裏選擇環境變量(註意下圖中的紅圈)
找到環境變量中的path變量:
雙擊path,在其後面增加:;%RABBITMQ_SERVER%\sbin (註意前面的分號),然後確定即可
現在打開windows命令行(“cmd”),輸入rabbitmq-service如果出現如下所示提示,即表示環境變量配置成功。
5. Rabbit還自帶監控功能.
cmd進到sbin目錄,鍵入rabbitmq-plugins enable rabbitmq_management啟用監控管理,然後重啟Rabbitmq服務器。 打開網址http://localhost:55672,用戶名和密碼都是guest。
6. 現在打開瀏覽器,輸入:http://localhost:15672/ ,如果出現以下頁面,則表示服務器配置成功。
默認用戶名為guest,密碼:guest
如果沒有出現以上頁面,嘗試在windows命令行中輸入(以管理員方式運行):
rabbitmq-plugins enable rabbitmq_management
然後運行下面的命令來安裝:
rabbitmq-service stop
rabbitmq-service install
rabbitmq-service start
二、 使用VS2010編譯RibbitMQ
1. 下載 rabbitmq-c-master 源碼
2. 下載 rabbitmq-codegen 源碼
3. 將 rabbitmq-codegen 中的內容拷貝到 rabbitmq-c-master 中的 codegen 目錄下(如果沒有該目錄請自行創建).
4. 下載 cmake 並安裝。
按下一步直接安裝:
這裏可以更改路徑(註意安裝路徑不要包含中文),安裝完畢即可。這時桌面會生成一個cmake_gui的快捷方式。
5. 使用cmake編譯rabbitmq-c源碼。
打開cmake。
source處(紅圈處)填你的rabbitmq-c-master源碼路徑,比如我放在d:/rabbitmq-c-master;下面的build處(黃圈處)填你source處的下面build文件夾(如果沒有build文件夾則創建一個)
點configure,這時會詢問選擇何種編譯器,選擇VS2010。
點finish。這時cmake會開始編譯,但是一會他可能會彈出如下錯誤信息:
直接把ENABLE_SSL_SUPPORT去掉,再按configure即可。
等編譯完成,再點Generate按鈕。整個編譯過程即完成。
6. 打開VS2010。用VS2010打開rabbit-c-master/build下的rabbitmq-c.sln項目
7. 點擊生成/生成解決方案(或按F7)
8. 項目此時會生成好:
9. 將D:\rabbitmq-c-master\build\librabbitmq\Debug下(註意D:\rabbitmq-c-master\build為前面cmake編譯生成文件路徑,下同)的rabbitmq.1.dll動態連接庫拷到D:\rabbitmq-c-master\rabbitmq-c-master\build\examples\Debug文件夾下。
10. 現在所有的rabbit-c的example都可以運行了。至此,可以按照 https://github.com/alanxz/rabbitmq-c 上的說明,執行測試了。
打開“cmd”,進入到D:\rabbitmq-c-master\build\examples\Debug文件夾下:
輸入amqp_listen.exe localhost 5672 amq.direct test:
再在另一個cmd中也到達D:\rabbitmq-c-master\build\examples\Debug
輸入amqp_sendstring.exe localhost 5672 amq.direct test
“hello world”
運行之後如果第一個終端出現如下界面,則說明整個rabbitMQ配置成功。
至此,說明Raabit-C客戶端與服務端都以成功。
RabbitMQ使用說明
一、 基本概念:
(一)基本概念
RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。我曾經對這門語言挺有興趣,學過一段時間,後來沒堅持。RabbitMQ是AMQP(高級消息隊列協議)的標準實現。如果不熟悉AMQP,直接看RabbitMQ的文檔會比較困難。不過它也只有幾個關鍵概念,這裏簡單介紹。
RabbitMQ的結構圖如下:
幾個概念說明:
Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什麽規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接裏,可建立多個channel,每個channel代表一個會話任務。
消息隊列的使用過程大概如下:
(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。
exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那麽客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。
RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那麽它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。
RabbitMQ-C客戶端使用說明 rabbitmq-c是一個用於C語言的,與AMQP server進行交互的client庫,AMQP協議為版本0-9-1。rabbitmq-c與server進行交互前需要首先進行login操作,在操作後,可以根據AMQP協議規範,執行一系列操作。 這裏,根據項目需求,只進行部分接口說明,文後附demo的github地址。 接口描述: amqp_connection_state_t amqp_new_connection(void); 接口說明:聲明一個新的amqp connection int amqp_open_socket(char const *hostname, int portnumber); 接口說明:獲取socket. 參數說明:hostname RabbitMQ server所在主機 portnumber RabbitMQ server監聽端口 void amqp_set_sockfd(amqp_connection_state_t state,int sockfd); 接口說明:將amqp connection和sockfd進行綁定 amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost,int channel_max,int frame_max,int heartbeat,amqp_sasl_method_enum sasl_method, ...); 接口說明:用於登錄RabbitMQ server,主要目的為了進行權限管理; 參數說明:state amqp connection vhost rabbit-mq的虛機主機,是rabbit-mq進行權限管理的最小單位 channel_max 最大鏈接數,此處設成0即可 frame_max 和客戶端通信時所允許的最大的frame size.默認值為131072,增大這個值有助於提高吞吐,降低這個值有利於降低時延 heartbeat 含義未知,默認值填0 sasl_method 用於SSL鑒權,默認值參考後文demo amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel); 接口說明:用於關聯conn和channel amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments); 接口說明:聲明declare 參數說明:state channel exchange type "fanout" "direct" "topic"三選一 passive curable arguments amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t arguments); 接口說明:聲明queue 參數說明:state amqp connection channel queue queue name passive durable 隊列是否持久化 exclusive 當前連接不在時,隊列是否自動刪除 aoto_delete 沒有consumer時,隊列是否自動刪除 arguments 用於拓展參數,比如x-ha-policy用於mirrored queue amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_tab le_t arguments); 接口說明:聲明binding amqp_basic_qos_ok_t *amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global); 接口說明:qos是 quality of service,我們這裏使用主要用於控制預取消息數,避免消息按條數均勻分配,需要和no_ack配合使用 參數說明:state channel prefetch_size 以bytes為單位,0為unlimited prefetch_count 預取的消息條數 global amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments); 接口說明:開始一個queue consumer 參數說明:state channel queue consumer_tag no_local no_ack 是否需要確認消息後再從隊列中刪除消息 exclusive arguments int amqp_basic_ack(amqp_connection_state_t state,amqp_channel_t channel,uint64_t delivery_tag,amqp_boolean_t multiple); int amqp_basic_publish(amqp_connection_state_t state,amqp_channel_t channel,amqp_bytes_t exchange,amqp_bytes_t routing_key,amqp_boolean_t mandatory,amqp_boolean_t immediate,struct amqp_basic_properties_t_ const *properties,amqp_bytes_t body); 接口說明:發布消息 參數說明:state channel exchange routing_key 當exchange為默認“”時,此處填寫queue_name,當exchange為direct,此處為binding_key mandatory 參見參考文獻2 immediate 同上 properties 更多屬性,如何設置消息持久化,參見文後demo body 消息體 amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,amqp_channel_t channel,int code); amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,int code); int amqp_destroy_connection(amqp_connection_state_t state); 如何consume消息,參見文後demo。 demo https://github.com/liuhaobupt/rabbitmq_work_queues_demo-with-rabbit-c-client-lib 其中 rmq_new_task.c和rmq_worker.c對應於RabbitMQ tutorial裏的work queues章節(http://www.rabbitmq.com/tutorials/tutorial-two-python.html),emit_log_direct.c和receive_logs_direct.c對應於RabbitMQ tutorial裏的routing章節(http://www.rabbitmq.com/tutorials/tutorial-four-python.html),這兩個demo覆蓋了RabbitMQ的常用應用場景。 編譯需要librabbitmq.a庫,同時需要rabbitmq-c提供的幾個頭文件(amqp.h和amqp_framing.h)以及utils.c文件,這些在github project頁面均可獲得。 參考文獻 1. rabbitmq-c主頁 http://hg.rabbitmq.com/rabbitmq-c/summary 2. http://www.rabbitmq.com/amqp-0-9-1-reference.html
|
五、 推薦網站 http://lostechies.com/derekgreer/tag/rabbitmq/ http://www.rabbitmq.com/getstarted.html (中文翻譯 http://adamlu.net/dev/2011/09/rabbitmq-get-started/ ) http://alanxz.github.io/rabbitmq-c/docs/0.2/annotated.html https://github.com/liuhaobupt/rabbitmq_work_queues_demo-with-rabbit-c-client-lib
六、 簡單代碼解釋(頭文件略去) // 下面提供了一個簡單消費者代碼demo,包括了RabbitMQ的絕大多數流程 // 實現了從默認交換機,隊列名為“QueueName1”中消息的獲取 int main(int argc, const char **argv) { const char *hostname = "localhost"; // 主機名 int port = 5672; // 端口 const char *exchange = ""; //交換機為默認時設為空字符串 const char *queuename = "QueueName1"; // 隊列名 amqp_socket_t *socket = NULL; int sockfd; int channelid = 1; // 默認打開的頻道 amqp_connection_state_t conn; conn = amqp_new_connection(); // 新建一個連接 socket = amqp_tcp_socket_new(conn); // 新建一個套接字 die_on_error(sockfd = amqp_socket_open(socket,hostname, port), "Opening socket"); //打開套接字
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),"Logging in"); // 登錄服務器 amqp_channel_open(conn, channelid); // 打開一個頻道 die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); // ampq_get_rpc_reply 是一個清除某些全局變量的函數
amqp_queue_declare(conn,channelid,amqp_cstring_bytes(queuename),0,1,0,0,amqp_empty_table); // 聲明一個隊列,註意對照API看參數信息
amqp_basic_qos(conn,channelid,0,1,0); //基本服務處理,可實現預先處理多少個信息 amqp_basic_consume(conn,channelid,amqp_cstring_bytes(queuename),amqp_empty_bytes,0,1,0,amqp_empty_table); // 開始一個隊列消費 die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{ while (1) { amqp_rpc_reply_t res; amqp_envelope_t envelope; amqp_maybe_release_buffers(conn); //清除緩存 res = amqp_consume_message(conn, &envelope, NULL, 0); //將獲得的消息交給envelope if (AMQP_RESPONSE_NORMAL != res.reply_type) { break; } printf("Delivery %u, exchange %.*s routingkey %.*s\n", (unsigned) envelope.delivery_tag, (int) envelope.exchange.len, (char *) envelope.exchange.bytes, (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes); printf("Content %.*s",(int)envelope.message.body.len,(char *)envelope.message.body.bytes); amqp_destroy_envelope(&envelope); } } die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); // 關閉頻道 die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); // 關閉連接 die_on_error(amqp_destroy_connection(conn), "Ending connection"); // 銷毀連接 return 0; }
|
Windows下當地RabbitMQ服務的安裝