1. 程式人生 > >Windows下當地RabbitMQ服務的安裝

Windows下當地RabbitMQ服務的安裝

frame download mlu all 以及 銷毀 number 接受 sign

Windows下本地RabbitMQ服務的安裝

本文參考:劉若澤相關技術文檔

當然這些內容頁可以通過RabbitMQ官方網站獲得。

RabbitMQ配置說明手冊

一、RaibbitMQ服務器配置

1. 準備工作。如果之前安裝過RabbitMQ軟件,若想重新安裝,必須先把之前的RabbitMQ相關軟件卸載。

2. 安裝ERLANG語言包。首先到http://www.erlang.org/download.html這個頁面下載 Erlang Windows Binary File並且運行。這個過程大約5分鐘左右。

安裝具體過程:

1.雙擊otp_win32_R16801.exe(不同版本可能命名字不一樣),選擇next

2.默認安裝在C盤,建議程序安裝在非系統盤比如D盤(如果安裝在C盤可能會出現一些權限問題),修改好安裝路徑後,選next:

3.進入安裝程序,選擇install,即可完成安裝:

3. 安裝RabbitMQ服務器軟件。到這個頁面下載:

http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.3/rabbitmq-server-3.1.3.exe。然後運行安裝。

安裝具體過程:

1. 雙擊rabbitmq-server-3.1.1.exe。選擇next:

2. 默認安裝在C盤,直接安裝即可:

3. 大約兩份鐘即可完成安裝

4. 如果想要Rabbitmq-sever能在windows下命令行下運行,還需要配置環境變量:

配置如下:先選擇高級系統設置會彈出系統設置,再在系統設置裏選擇環境變量(註意下圖中的紅圈)

找到環境變量中的path變量:

雙擊path,在其後面增加:;%RABBITMQ_SERVER%\sbin (註意前面的分號),然後確定即可

現在打開windows命令行(“cmd”),輸入rabbitmq-service如果出現如下所示提示,即表示環境變量配置成功。

5. Rabbit還自帶監控功能.
cmd進到sbin目錄,鍵入rabbitmq-plugins enable rabbitmq_management啟用監控管理,然後重啟Rabbitmq服務器。 打開網址http://localhost:55672,用戶名和密碼都是guest。

6. 現在打開瀏覽器,輸入:http://localhost:15672/ ,如果出現以下頁面,則表示服務器配置成功。

默認用戶名為guest,密碼:guest

如果沒有出現以上頁面,嘗試在windows命令行中輸入(以管理員方式運行):

rabbitmq-plugins enable rabbitmq_management

然後運行下面的命令來安裝:

rabbitmq-service stop

rabbitmq-service install

rabbitmq-service start

二、 使用VS2010編譯RibbitMQ

1. 下載 rabbitmq-c-master 源碼

2. 下載 rabbitmq-codegen 源碼

3. 將 rabbitmq-codegen 中的內容拷貝到 rabbitmq-c-master 中的 codegen 目錄下(如果沒有該目錄請自行創建).

4. 下載 cmake 並安裝。

按下一步直接安裝:

這裏可以更改路徑(註意安裝路徑不要包含中文),安裝完畢即可。這時桌面會生成一個cmake_gui的快捷方式。

5. 使用cmake編譯rabbitmq-c源碼。

打開cmake。

source處(紅圈處)填你的rabbitmq-c-master源碼路徑,比如我放在d:/rabbitmq-c-master;下面的build處(黃圈處)填你source處的下面build文件夾(如果沒有build文件夾則創建一個)

點configure,這時會詢問選擇何種編譯器,選擇VS2010。

點finish。這時cmake會開始編譯,但是一會他可能會彈出如下錯誤信息:

直接把ENABLE_SSL_SUPPORT去掉,再按configure即可。

等編譯完成,再點Generate按鈕。整個編譯過程即完成。

6. 打開VS2010。用VS2010打開rabbit-c-master/build下的rabbitmq-c.sln項目

7. 點擊生成/生成解決方案(或按F7)

8. 項目此時會生成好:

9. 將D:\rabbitmq-c-master\build\librabbitmq\Debug下(註意D:\rabbitmq-c-master\build為前面cmake編譯生成文件路徑,下同)的rabbitmq.1.dll動態連接庫拷到D:\rabbitmq-c-master\rabbitmq-c-master\build\examples\Debug文件夾下。

10. 現在所有的rabbit-c的example都可以運行了。至此,可以按照 https://github.com/alanxz/rabbitmq-c 上的說明,執行測試了。

打開“cmd”,進入到D:\rabbitmq-c-master\build\examples\Debug文件夾下:

輸入amqp_listen.exe localhost 5672 amq.direct test:

再在另一個cmd中也到達D:\rabbitmq-c-master\build\examples\Debug

輸入amqp_sendstring.exe localhost 5672 amq.direct test

“hello world”

運行之後如果第一個終端出現如下界面,則說明整個rabbitMQ配置成功。

至此,說明Raabit-C客戶端與服務端都以成功。

RabbitMQ使用說明

一、 基本概念:

(一)基本概念

RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。我曾經對這門語言挺有興趣,學過一段時間,後來沒堅持。RabbitMQ是AMQP(高級消息隊列協議)的標準實現。如果不熟悉AMQP,直接看RabbitMQ的文檔會比較困難。不過它也只有幾個關鍵概念,這裏簡單介紹。

RabbitMQ的結構圖如下:

幾個概念說明:

Broker:簡單來說就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什麽規則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏可以開設多個vhost,用作不同用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接裏,可建立多個channel,每個channel代表一個會話任務。

消息隊列的使用過程大概如下:

(1)客戶端連接到消息隊列服務器,打開一個channel。
(2)客戶端聲明一個exchange,並設置相關屬性。
(3)客戶端聲明一個queue,並設置相關屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
(5)客戶端投遞消息到exchange。

exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。

exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那麽客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配後進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。

RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那麽它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

RabbitMQ-C客戶端使用說明

rabbitmq-c是一個用於C語言的,與AMQP server進行交互的client庫,AMQP協議為版本0-9-1。rabbitmq-c與server進行交互前需要首先進行login操作,在操作後,可以根據AMQP協議規範,執行一系列操作。

這裏,根據項目需求,只進行部分接口說明,文後附demo的github地址。

接口描述:

amqp_connection_state_t amqp_new_connection(void);

接口說明:聲明一個新的amqp connection

int amqp_open_socket(char const *hostname, int portnumber);

接口說明:獲取socket.

參數說明:hostname RabbitMQ server所在主機

portnumber RabbitMQ server監聽端口

void amqp_set_sockfd(amqp_connection_state_t state,int sockfd);

接口說明:將amqp connection和sockfd進行綁定

amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost,int channel_max,int frame_max,int heartbeat,amqp_sasl_method_enum sasl_method, ...);

接口說明:用於登錄RabbitMQ server,主要目的為了進行權限管理;

參數說明:state amqp connection

vhost rabbit-mq的虛機主機,是rabbit-mq進行權限管理的最小單位

channel_max 最大鏈接數,此處設成0即可

frame_max 和客戶端通信時所允許的最大的frame size.默認值為131072,增大這個值有助於提高吞吐,降低這個值有利於降低時延

heartbeat 含義未知,默認值填0

sasl_method 用於SSL鑒權,默認值參考後文demo

amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel);

接口說明:用於關聯conn和channel

amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments);

接口說明:聲明declare

參數說明:state

channel

exchange

type "fanout" "direct" "topic"三選一

passive

curable

arguments

amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t arguments);

接口說明:聲明queue

參數說明:state amqp connection

channel

queue queue name

passive

durable 隊列是否持久化

exclusive 當前連接不在時,隊列是否自動刪除

aoto_delete 沒有consumer時,隊列是否自動刪除

arguments 用於拓展參數,比如x-ha-policy用於mirrored queue

amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_tab le_t arguments);

接口說明:聲明binding

amqp_basic_qos_ok_t *amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global);

接口說明:qos是 quality of service,我們這裏使用主要用於控制預取消息數,避免消息按條數均勻分配,需要和no_ack配合使用

參數說明:state

channel

prefetch_size 以bytes為單位,0為unlimited

prefetch_count 預取的消息條數

global

amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments);

接口說明:開始一個queue consumer

參數說明:state

channel

queue

consumer_tag

no_local

no_ack 是否需要確認消息後再從隊列中刪除消息

exclusive

arguments

int amqp_basic_ack(amqp_connection_state_t state,amqp_channel_t channel,uint64_t delivery_tag,amqp_boolean_t multiple);

int amqp_basic_publish(amqp_connection_state_t state,amqp_channel_t channel,amqp_bytes_t exchange,amqp_bytes_t routing_key,amqp_boolean_t mandatory,amqp_boolean_t immediate,struct amqp_basic_properties_t_ const *properties,amqp_bytes_t body);

接口說明:發布消息

參數說明:state

channel

exchange

routing_key 當exchange為默認“”時,此處填寫queue_name,當exchange為direct,此處為binding_key

mandatory 參見參考文獻2

immediate 同上

properties 更多屬性,如何設置消息持久化,參見文後demo

body 消息體

amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,amqp_channel_t channel,int code);

amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,int code);

int amqp_destroy_connection(amqp_connection_state_t state);

如何consume消息,參見文後demo。

demo

https://github.com/liuhaobupt/rabbitmq_work_queues_demo-with-rabbit-c-client-lib

其中 rmq_new_task.c和rmq_worker.c對應於RabbitMQ tutorial裏的work queues章節(http://www.rabbitmq.com/tutorials/tutorial-two-python.html),emit_log_direct.c和receive_logs_direct.c對應於RabbitMQ tutorial裏的routing章節(http://www.rabbitmq.com/tutorials/tutorial-four-python.html),這兩個demo覆蓋了RabbitMQ的常用應用場景。

編譯需要librabbitmq.a庫,同時需要rabbitmq-c提供的幾個頭文件(amqp.h和amqp_framing.h)以及utils.c文件,這些在github project頁面均可獲得。

參考文獻

1. rabbitmq-c主頁 http://hg.rabbitmq.com/rabbitmq-c/summary

2. http://www.rabbitmq.com/amqp-0-9-1-reference.html

五、 推薦網站

http://lostechies.com/derekgreer/tag/rabbitmq/

http://www.rabbitmq.com/getstarted.html

(中文翻譯 http://adamlu.net/dev/2011/09/rabbitmq-get-started/ )

http://alanxz.github.io/rabbitmq-c/docs/0.2/annotated.html

https://github.com/liuhaobupt/rabbitmq_work_queues_demo-with-rabbit-c-client-lib

六、 簡單代碼解釋(頭文件略去)

// 下面提供了一個簡單消費者代碼demo,包括了RabbitMQ的絕大多數流程

// 實現了從默認交換機,隊列名為“QueueName1”中消息的獲取

int main(int argc, const char **argv) {

const char *hostname = "localhost"; // 主機名

int port = 5672; // 端口

const char *exchange = ""; //交換機為默認時設為空字符串

const char *queuename = "QueueName1"; // 隊列名

amqp_socket_t *socket = NULL;

int sockfd;

int channelid = 1; // 默認打開的頻道

amqp_connection_state_t conn;

conn = amqp_new_connection(); // 新建一個連接

socket = amqp_tcp_socket_new(conn); // 新建一個套接字

die_on_error(sockfd = amqp_socket_open(socket,hostname, port), "Opening socket"); //打開套接字

die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),"Logging in"); // 登錄服務器

amqp_channel_open(conn, channelid); // 打開一個頻道

die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); // ampq_get_rpc_reply 是一個清除某些全局變量的函數

amqp_queue_declare(conn,channelid,amqp_cstring_bytes(queuename),0,1,0,0,amqp_empty_table); // 聲明一個隊列,註意對照API看參數信息

amqp_basic_qos(conn,channelid,0,1,0); //基本服務處理,可實現預先處理多少個信息

amqp_basic_consume(conn,channelid,amqp_cstring_bytes(queuename),amqp_empty_bytes,0,1,0,amqp_empty_table); // 開始一個隊列消費

die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");

{

while (1) {

amqp_rpc_reply_t res;

amqp_envelope_t envelope;

amqp_maybe_release_buffers(conn); //清除緩存

res = amqp_consume_message(conn, &envelope, NULL, 0); //將獲得的消息交給envelope

if (AMQP_RESPONSE_NORMAL != res.reply_type) {

break;

}

printf("Delivery %u, exchange %.*s routingkey %.*s\n",

(unsigned) envelope.delivery_tag,

(int) envelope.exchange.len, (char *) envelope.exchange.bytes,

(int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);

printf("Content %.*s",(int)envelope.message.body.len,(char *)envelope.message.body.bytes);

amqp_destroy_envelope(&envelope);

}

}

die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); // 關閉頻道

die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); // 關閉連接

die_on_error(amqp_destroy_connection(conn), "Ending connection"); // 銷毀連接

return 0;

}

Windows下當地RabbitMQ服務的安裝